这一部分将会在最底层和最简单的等级讨论 HBase 与 Spark 的整合。其他交互的要点都是基于这些操作构建的,我们会在这里完整描述。
一切 HBase 和 Spark 整合的基础都是 HBaseContext,HBaseContext 接受 HBase 配置并且会将其推送到 Spark 执行器(executor)中。这允许我们在每个 Spark 执行器(executor)中有一个静态的 HBase 连接。
作为参考,Spark 执行器(executor)既可以和 Region Server 在同一个节点,也可以在不同的节点,他们不存在共存的依赖关系。
可以认为每个 Spark 执行器(executor)都是一个多线程的客户端程序,这允许运行在不同的执行器上的 Spark 任务访问共享的连接对象。
例 31. HBaseContext 使用例程
这个例子展现了如何使用 Scala 语言在 RDD 的foreachPartition
方法中使用 HBaseContext。
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
})
bufferedMutator.flush()
bufferedMutator.close()
})
这里是使用 Java 编写的同样的例子。
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
...
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.foreachPartition(rdd,
new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
public void call(Tuple2<Iterator<byte[]>, Connection> t)
throws Exception {
Table table = t._2().getTable(TableName.valueOf(tableName));
BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
while (t._1().hasNext()) {
byte[] b = t._1().next();
Result r = table.get(new Get(b));
if (r.getExists()) {
mutator.mutate(new Put(b));
}
}
mutator.flush();
mutator.close();
table.close();
}
});
} finally {
jsc.stop();
}
所有的函数式都同时在 Spark 和 HBase 中,并且都支持用 Scala 或者 Java 开发。除了 SparkSQL 以外,所有 Spark 支持的语言在这里也都支持。 目前在余下的文档中,我们将会重点关注 Scala 的例程。
上面的例程阐释了如何在 foreachPartition 操作中使用连接。除此之外,许多 Spark 的基础函数都是支持的:
bulkPut
并行的写入大量数据到 HBase
bulkDelete
并行的删除 HBase 中大量数据
bulkGet
并行的从 HBase 中获取大量的数据,并且创建一个新的 RDD
mapPartition
在 Spark 的 Map 函数中使用连接对象,并且允许使用完整的 HBase 访问
hBaseRDD
简单的创建一个用于分布式扫描数据的 RDD
想要参看所有机能的例程,参见 HBase-Spark 模块。