使用 Spark 加载大量的数据到 HBase 有两个选项。 基本的大量数据加载功能适用于你的行有数百万列数据,以及在 Spark 批量加载之前的 Map 操作列没有合并和分组的情况。
Spark 中还有一个轻量批量加载选项,这个第二选项设计给每一行少于一万的情况。 第二个选项的优势在于更高的吞吐量,以及 Spark 的 shuffle 操作中更轻的负载。
两种实现都或多或少的类似 MapReduce 批量加载过程, 因为分区器基于 Region 划分对行键进行分区。并且行键被顺序的发送到 Reducer 所以 HFile 可以在 reduce 阶段被直接写出。
依照 Spark 的术语来说,批量加载将会基于repartitionAndSortWithinPartitions
实现,并且之后是 Spark 的foreachPartition
。
让我们首先看一下使用批量加载功能的例子
例 33. 批量加载例程
下面的例子展现了 Spark 中的批量加载。
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2(0)._1
val qualifier = t._2(0)._2
val value = t._2(0)._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder.getPath)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
hbaseBulkLoad
函数需要三个必要参数:
我们需要从之加载数据的表名
一个函数用于将 RDD 中的某个记录转化为一个元组形式的键值对。 其中键值是一个 KeyFamilyQualifer 对象,值是 cell value。 KeyFamilyQualifer 将会保存行键,列族和列标识位。 针对行键的随机操作会根据这三个值来排序。
写出 HFile 的临时路径
接下来的 Spark 批量加载指令,使用 HBase 的 LoadIncrementalHFiles 对象来加载 HBase 中新创建的 HFiles。
使用 Spark 批量加载的附加参数
你可以在 hbaseBulkLoad 中用附加参数设置以下属性:
例 34. 使用附加参数
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2(0)._1
val qualifier = t._2(0)._2
val value = t._2(0)._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder.getPath,
familyHBaseWriterOptions,
compactionExclude = false,
HConstants.DEFAULT_MAX_FILE_SIZE)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
现在让我们来看一下如何调用轻量化对象批量加载的实现:
例 35. 使用轻量批量加载
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
("1",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
t._2.foreach(f => {
val family:Array[Byte] = f._1
val qualifier = f._2
val value:Array[Byte] = f._3
familyQualifiersValues +=(family, qualifier, value)
})
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
},
stagingFolder.getPath,
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude = false,
20)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
注意在使用轻量行批量加载的时候函数返回的元组中: 第一个元素是行键,第二个元素是 FamiliesQualifiersValues,这个对象中含有这一行里所有的数值,并且包含了所有的列族。