Spark Streaming 是一个基于 Spark 构建的微批流处理框架。 HBase 和 Spark Streaming 的良好配合使得 HBase 可以提供一下益处:
可以动态的获取参考或者描述性数据
基于 Spark Streaming 提供的恰好一次处理,可以存储计数或者聚合结果
HBase-Spark 模块整合的和 Spark Streaming 的相关的点与 Spark 整合的点非常相似, 以下的指令可以在 Spark Streaming DStream 中立刻使用:
bulkPut
并行的写入大量数据到 HBase
bulkDelete
并行的删除 HBase 中大量数据
bulkGet
并行的从 HBase 中获取大量的数据,并且创建一个新的 RDD
mapPartition
在 Spark 的 Map 函数中使用连接对象,并且允许使用完整的 HBase 访问
hBaseRDD
简单的创建一个用于分布式扫描数据的 RDD。
例 32. bulkPut
在 DStreams 中使用的例程
以下是 bulkPut 在 DStreams 中的使用例程,感觉上与 RDD 批量插入非常接近。
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))
val rdd1 = ...
val rdd2 = ...
val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
Array[Byte], Array[Byte])])]]()
queue += rdd1
queue += rdd2
val dStream = ssc.queueStream(queue)
dStream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
这里到hbaseBulkPut
函数有三个输入,hbaseContext 携带了配置广播信息,来帮助我们连接到执行器中的 HBase Connections。
表名用于指明我们要往哪个表放数据。一个函数将 DStream 中的记录转换为 HBase Put 对象。