使用 Spark 加载大量的数据到 HBase 有两个选项。 基本的大量数据加载功能适用于你的行有数百万列数据,以及在 Spark 批量加载之前的 Map 操作列没有合并和分组的情况。

Spark 中还有一个轻量批量加载选项,这个第二选项设计给每一行少于一万的情况。 第二个选项的优势在于更高的吞吐量,以及 Spark 的 shuffle 操作中更轻的负载。

两种实现都或多或少的类似 MapReduce 批量加载过程, 因为分区器基于 Region 划分对行键进行分区。并且行键被顺序的发送到 Reducer 所以 HFile 可以在 reduce 阶段被直接写出。

依照 Spark 的术语来说,批量加载将会基于repartitionAndSortWithinPartitions实现,并且之后是 Spark 的foreachPartition

让我们首先看一下使用批量加载功能的例子

例 33. 批量加载例程

下面的例子展现了 Spark 中的批量加载。

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  t => {
   val rowKey = t._1
   val family:Array[Byte] = t._2(0)._1
   val qualifier = t._2(0)._2
   val value = t._2(0)._3

   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

   Seq((keyFamilyQualifier, value)).iterator
  },
  stagingFolder.getPath)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

hbaseBulkLoad 函数需要三个必要参数:

  1. 我们需要从之加载数据的表名

  2. 一个函数用于将 RDD 中的某个记录转化为一个元组形式的键值对。 其中键值是一个 KeyFamilyQualifer 对象,值是 cell value。 KeyFamilyQualifer 将会保存行键,列族和列标识位。 针对行键的随机操作会根据这三个值来排序。

  3. 写出 HFile 的临时路径

接下来的 Spark 批量加载指令,使用 HBase 的 LoadIncrementalHFiles 对象来加载 HBase 中新创建的 HFiles。

使用 Spark 批量加载的附加参数

你可以在 hbaseBulkLoad 中用附加参数设置以下属性:

  • HFile 的最大文件大小
  • 从压缩中排除 HFile 的标志
  • 列族设置,包含 compression(压缩), bloomType(布隆(过滤器)类型), blockSize(块大小), and dataBlockEncoding(数据块编码)

例 34. 使用附加参数

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      (Bytes.toBytes("3"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")

familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)

rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  t => {
   val rowKey = t._1
   val family:Array[Byte] = t._2(0)._1
   val qualifier = t._2(0)._2
   val value = t._2(0)._3

   val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)

   Seq((keyFamilyQualifier, value)).iterator
  },
  stagingFolder.getPath,
  familyHBaseWriterOptions,
  compactionExclude = false,
  HConstants.DEFAULT_MAX_FILE_SIZE)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

现在让我们来看一下如何调用轻量化对象批量加载的实现:

例 35. 使用轻量批量加载

val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()

val hbaseContext = new HBaseContext(sc, config)

val stagingFolder = ...
val rdd = sc.parallelize(Array(
      ("1",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
      ("3",
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...

rdd.hbaseBulkLoadThinRows(hbaseContext,
      TableName.valueOf(tableName),
      t => {
        val rowKey = t._1

        val familyQualifiersValues = new FamiliesQualifiersValues
        t._2.foreach(f => {
          val family:Array[Byte] = f._1
          val qualifier = f._2
          val value:Array[Byte] = f._3

          familyQualifiersValues +=(family, qualifier, value)
        })
        (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
      },
      stagingFolder.getPath,
      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
      compactionExclude = false,
      20)

val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
  conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

注意在使用轻量行批量加载的时候函数返回的元组中: 第一个元素是行键,第二个元素是 FamiliesQualifiersValues,这个对象中含有这一行里所有的数值,并且包含了所有的列族。