HBase 提供了 Observer Coprocessor 的示例。

下面给出更详细的例子。

这些示例假设一个名为users的表,它有两个列族personalDetsalaryDet,包含个人和工资详细信息。下面是users表的图形表示。

personalDet salaryDet
jverne 儒勒 凡尔纳
rowkey 名称 姓氏
管理 管理员 Admin
cdickens 查尔斯 狄更斯

111.1。观察者示例

以下 Observer 协处理器可防止在users表的GetScan中返回用户admin的详细信息。

  1. 编写一个实现 RegionObserver 类的类。

  2. 覆盖preGetOp()方法(不推荐使用preGet()方法)以检查客户端是否已使用值admin查询 rowkey。如果是,则返回空结果。否则,正常处理请求。

  3. 将您的代码和依赖项放在 JAR 文件中。

  4. 将 JAR 放在 HDFS 中,HBase 可以在其中找到它。

  5. 加载协处理器。

  6. 写一个简单的程序来测试它。

以下是上述步骤的实施:

public class RegionObserverExample implements RegionObserver {

    private static final byte[] ADMIN = Bytes.toBytes("admin");
    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
    private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
    private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");

    @Override
    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results)
    throws IOException {

        if (Bytes.equals(get.getRow(),ADMIN)) {
            Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
            System.currentTimeMillis(), (byte)4, VALUE);
            results.add(c);
            e.bypass();
        }
    }
} 

覆盖preGetOp()仅适用于Get操作。您还需要覆盖preScannerOpen()方法以从扫描结果中过滤admin行。

@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
final RegionScanner s) throws IOException {

    Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
    scan.setFilter(filter);
    return s;
} 

这种方法有效,但有 _ 副作用 _。如果客户端在其扫描中使用了过滤器,则该过滤器将替换该过滤器。相反,您可以显式删除扫描中的任何admin结果:

@Override
public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
final List<Result> results, final int limit, final boolean hasMore) throws IOException {
        Result result = null;
    Iterator<Result> iterator = results.iterator();
    while (iterator.hasNext()) {
    result = iterator.next();
        if (Bytes.equals(result.getRow(), ROWKEY)) {
            iterator.remove();
            break;
        }
    }
    return hasMore;
} 

111.2。端点示例

仍然使用users表,此示例使用端点协处理器实现协处理器以计算所有员工工资的总和。

  1. 创建一个定义服务的'.proto'文件。

    option java_package = "org.myname.hbase.coprocessor.autogenerated";
    option java_outer_classname = "Sum";
    option java_generic_services = true;
    option java_generate_equals_and_hash = true;
    option optimize_for = SPEED;
    message SumRequest {
        required string family = 1;
        required string column = 2;
    }
    
    message SumResponse {
      required int64 sum = 1 [default = 0];
    }
    
    service SumService {
      rpc getSum(SumRequest)
        returns (SumResponse);
    } 
    
  2. 执行protoc命令从上面的.proto'文件生成 Java 代码。

    $ mkdir src
    $ protoc --java_out=src ./sum.proto 
    

    这将生成一个类调用Sum.java

  3. 编写一个扩展生成的服务类的类,实现CoprocessorCoprocessorService类,并覆盖服务方法。

    > 如果从hbase-site.xml加载协处理器,然后使用 HBase Shell 再次加载同一个协处理器,它将再次加载。同一个类将存在两次,第二个实例将具有更高的 ID(因此具有更低的优先级)。结果是有效地忽略了重复的协处理器。

    public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
    
        private RegionCoprocessorEnvironment env;
    
        @Override
        public Service getService() {
            return this;
        }
    
        @Override
        public void start(CoprocessorEnvironment env) throws IOException {
            if (env instanceof RegionCoprocessorEnvironment) {
                this.env = (RegionCoprocessorEnvironment)env;
            } else {
                throw new CoprocessorException("Must be loaded on a table region!");
            }
        }
    
        @Override
        public void stop(CoprocessorEnvironment env) throws IOException {
            // do nothing
        }
    
        @Override
        public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback&lt;Sum.SumResponse&gt; done) {
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes(request.getFamily()));
            scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
    
            Sum.SumResponse response = null;
            InternalScanner scanner = null;
    
            try {
                scanner = env.getRegion().getScanner(scan);
                List&lt;Cell&gt; results = new ArrayList&lt;&gt;();
                boolean hasMore = false;
                long sum = 0L;
    
                do {
                    hasMore = scanner.next(results);
                    for (Cell cell : results) {
                        sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
                    }
                    results.clear();
                } while (hasMore);
    
                response = Sum.SumResponse.newBuilder().setSum(sum).build();
            } catch (IOException ioe) {
                ResponseConverter.setControllerException(controller, ioe);
            } finally {
                if (scanner != null) {
                    try {
                        scanner.close();
                    } catch (IOException ignored) {}
                }
            }
    
            done.run(response);
        }
    } 
    
    Configuration conf = HBaseConfiguration.create();
    Connection connection = ConnectionFactory.createConnection(conf);
    TableName tableName = TableName.valueOf("users");
    Table table = connection.getTable(tableName);
    
    final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
    try {
        Map&lt;byte[], Long&gt; results = table.coprocessorService(
            Sum.SumService.class,
            null,  /* start key */
            null,  /* end   key */
            new Batch.Call&lt;Sum.SumService, Long&gt;() {
                @Override
                public Long call(Sum.SumService aggregate) throws IOException {
                    BlockingRpcCallback&lt;Sum.SumResponse&gt; rpcCallback = new BlockingRpcCallback&lt;&gt;();
                    aggregate.getSum(null, request, rpcCallback);
                    Sum.SumResponse response = rpcCallback.get();
    
                    return response.hasSum() ? response.getSum() : 0L;
                }
            }
        );
    
        for (Long sum : results.values()) {
            System.out.println("Sum = " + sum);
        }
    } catch (ServiceException e) {
        e.printStackTrace();
    } catch (Throwable e) {
        e.printStackTrace();
    } 
    
  4. Load the Coprocessor.

  5. 编写客户端代码以调用协处理器。