这一部分将会在最底层和最简单的等级讨论 HBase 与 Spark 的整合。其他交互的要点都是基于这些操作构建的,我们会在这里完整描述。

一切 HBase 和 Spark 整合的基础都是 HBaseContext,HBaseContext 接受 HBase 配置并且会将其推送到 Spark 执行器(executor)中。这允许我们在每个 Spark 执行器(executor)中有一个静态的 HBase 连接。

作为参考,Spark 执行器(executor)既可以和 Region Server 在同一个节点,也可以在不同的节点,他们不存在共存的依赖关系。

可以认为每个 Spark 执行器(executor)都是一个多线程的客户端程序,这允许运行在不同的执行器上的 Spark 任务访问共享的连接对象。

例 31. HBaseContext 使用例程

这个例子展现了如何使用 Scala 语言在 RDD 的foreachPartition方法中使用 HBaseContext。

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

...

val hbaseContext = new HBaseContext(sc, config)

rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
 val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
 it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
 })
 bufferedMutator.flush()
 bufferedMutator.close()
})

这里是使用 Java 编写的同样的例子。

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

try {
  List<byte[]> list = new ArrayList<>();
  list.add(Bytes.toBytes("1"));
  ...
  list.add(Bytes.toBytes("5"));

  JavaRDD<byte[]> rdd = jsc.parallelize(list);
  Configuration conf = HBaseConfiguration.create();

  JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

  hbaseContext.foreachPartition(rdd,
      new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
   public void call(Tuple2<Iterator<byte[]>, Connection> t)
        throws Exception {
    Table table = t._2().getTable(TableName.valueOf(tableName));
    BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
    while (t._1().hasNext()) {
      byte[] b = t._1().next();
      Result r = table.get(new Get(b));
      if (r.getExists()) {
       mutator.mutate(new Put(b));
      }
    }

    mutator.flush();
    mutator.close();
    table.close();
   }
  });
} finally {
  jsc.stop();
}

所有的函数式都同时在 Spark 和 HBase 中,并且都支持用 Scala 或者 Java 开发。除了 SparkSQL 以外,所有 Spark 支持的语言在这里也都支持。 目前在余下的文档中,我们将会重点关注 Scala 的例程。

上面的例程阐释了如何在 foreachPartition 操作中使用连接。除此之外,许多 Spark 的基础函数都是支持的:

bulkPut

并行的写入大量数据到 HBase

bulkDelete

并行的删除 HBase 中大量数据

bulkGet

并行的从 HBase 中获取大量的数据,并且创建一个新的 RDD

mapPartition

在 Spark 的 Map 函数中使用连接对象,并且允许使用完整的 HBase 访问

hBaseRDD

简单的创建一个用于分布式扫描数据的 RDD

想要参看所有机能的例程,参见 HBase-Spark 模块。