HBase 提供了 Observer Coprocessor 的示例。
下面给出更详细的例子。
这些示例假设一个名为users
的表,它有两个列族personalDet
和salaryDet
,包含个人和工资详细信息。下面是users
表的图形表示。
personalDet | salaryDet | |
---|---|---|
jverne | 儒勒 | 凡尔纳 |
rowkey | 名称 | 姓氏 |
管理 | 管理员 | Admin |
cdickens | 查尔斯 | 狄更斯 |
以下 Observer 协处理器可防止在users
表的Get
或Scan
中返回用户admin
的详细信息。
编写一个实现 RegionObserver 类的类。
覆盖preGetOp()
方法(不推荐使用preGet()
方法)以检查客户端是否已使用值admin
查询 rowkey。如果是,则返回空结果。否则,正常处理请求。
将您的代码和依赖项放在 JAR 文件中。
将 JAR 放在 HDFS 中,HBase 可以在其中找到它。
加载协处理器。
写一个简单的程序来测试它。
以下是上述步骤的实施:
public class RegionObserverExample implements RegionObserver {
private static final byte[] ADMIN = Bytes.toBytes("admin");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<Cell> results)
throws IOException {
if (Bytes.equals(get.getRow(),ADMIN)) {
Cell c = CellUtil.createCell(get.getRow(),COLUMN_FAMILY, COLUMN,
System.currentTimeMillis(), (byte)4, VALUE);
results.add(c);
e.bypass();
}
}
}
覆盖preGetOp()
仅适用于Get
操作。您还需要覆盖preScannerOpen()
方法以从扫描结果中过滤admin
行。
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
final RegionScanner s) throws IOException {
Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
scan.setFilter(filter);
return s;
}
这种方法有效,但有 _ 副作用 _。如果客户端在其扫描中使用了过滤器,则该过滤器将替换该过滤器。相反,您可以显式删除扫描中的任何admin
结果:
@Override
public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
final List<Result> results, final int limit, final boolean hasMore) throws IOException {
Result result = null;
Iterator<Result> iterator = results.iterator();
while (iterator.hasNext()) {
result = iterator.next();
if (Bytes.equals(result.getRow(), ROWKEY)) {
iterator.remove();
break;
}
}
return hasMore;
}
仍然使用users
表,此示例使用端点协处理器实现协处理器以计算所有员工工资的总和。
创建一个定义服务的'.proto'文件。
option java_package = "org.myname.hbase.coprocessor.autogenerated";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequest {
required string family = 1;
required string column = 2;
}
message SumResponse {
required int64 sum = 1 [default = 0];
}
service SumService {
rpc getSum(SumRequest)
returns (SumResponse);
}
执行protoc
命令从上面的.proto'文件生成 Java 代码。
$ mkdir src
$ protoc --java_out=src ./sum.proto
这将生成一个类调用Sum.java
。
编写一个扩展生成的服务类的类,实现Coprocessor
和CoprocessorService
类,并覆盖服务方法。
> 如果从hbase-site.xml
加载协处理器,然后使用 HBase Shell 再次加载同一个协处理器,它将再次加载。同一个类将存在两次,第二个实例将具有更高的 ID(因此具有更低的优先级)。结果是有效地忽略了重复的协处理器。
public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// do nothing
}
@Override
public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
Sum.SumResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean hasMore = false;
long sum = 0L;
do {
hasMore = scanner.next(results);
for (Cell cell : results) {
sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
}
results.clear();
} while (hasMore);
response = Sum.SumResponse.newBuilder().setSum(sum).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
}
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("users");
Table table = connection.getTable(tableName);
final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
try {
Map<byte[], Long> results = table.coprocessorService(
Sum.SumService.class,
null, /* start key */
null, /* end key */
new Batch.Call<Sum.SumService, Long>() {
@Override
public Long call(Sum.SumService aggregate) throws IOException {
BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>();
aggregate.getSum(null, request, rpcCallback);
Sum.SumResponse response = rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
}
);
for (Long sum : results.values()) {
System.out.println("Sum = " + sum);
}
} catch (ServiceException e) {
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}
Load the Coprocessor.
编写客户端代码以调用协处理器。