(HBase-Spark 中的)HBase-Spark 连接器 提供了: DataSource API (SPARK-3247) 在 Spark 1.2.0 的时候被引入,连接了简单的 HBase 的键值存储与复杂的关系型 SQL 查询,并且使得用户可以使用 Spark 在 HBase 上施展复杂的数据分析工作。 HBase Dataframe 是 Spark Dataframe 的一个标准,并且它允许和其他任何数据源——例如 Hive, Orc, Parquet, JSON 之类。 HBase-Spark Connector 使用的关键技术例如分区修剪,列修剪,推断后置以及数据本地化。

为了使用 HBase-Spark connector,用户需要定义 HBase 到 Spark 表中的映射目录。 准备数据并且填充 HBase 的表,然后将其加载到 HBase DataFrame 中去。 在此之后,用户可以使用 SQL 查询语句整合查询与数据获取。 接下来的例子说明了最基本的过程

107.1. 定义目录

def catalog = s"""{
       |"table":{"namespace":"default", "name":"table1"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin

目录定义了从 HBase 到 Spark 表的一个映射。 这个目录中有两个关键部分。 第一个是行键的定义,另一个是将 Spark 表中的列映射到 HBase 的列族和列标识位。 上面的 schema 定义了一个 HBase 中的表,名为 Table1,行键作为键与一些列(col1 - col8)。 注意行键也需要被定义为一个列(col0),该列具有特定的列族(rowkey)。

107.2. 保存 DataFrame

case class HBaseRecord(
   col0: String,
   col1: Boolean,
   col2: Double,
   col3: Float,
   col4: Int,
   col5: Long,
   col6: Short,
   col7: String,
   col8: Byte)

object HBaseRecord
{
   def apply(i: Int, t: String): HBaseRecord = {
      val s = s"""row${"%03d".format(i)}"""
      HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,
      i,
      i.toLong,
      i.toShort,
      s"String$i: $t",
      i.toByte)
  }
}

val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
 .format("org.apache.hadoop.hbase.spark ")
 .save()

用户准备的数据(data)是一个本地的 Scala 集合,含有 256 个 HBaseRecord 对象。 sc.parallelize(data) 函数分发了从 RDD 中来的数据。toDF返回了一个 DataFrame。 write 函数返回了一个 DataFrameWriter 来将 DataFrame 中的数据到外部存储(例如这里是 HBase)。 save 函数将会创建一个具有 5 个 Region 的 HBase 表来在内部保存 DataFrame。

107.3. 加载 DataFrame

def withCatalog(cat: String): DataFrame = {
  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->cat))
  .format("org.apache.hadoop.hbase.spark")
  .load()
}
val df = withCatalog(catalog)

在 withCatalog 函数中,sqlContext 是一个 SQLContext 的变量,是一个用于与 Spark 中结构化(行与列)的数据一起工作的一个入口点。 read 返回一个 DataFrameReader,他可以用于从 DataFrame 中读取数据。option函数为输出到 DataFrameReader 的底层的数据源增加了输入选项。 以及,format函数表示了 DataFrameReader 的输入数据源的格式。 load() 函数将其加载为一个 DataFrame, 数据帧 df将由withCatalog函数返回,用于访问 HBase 表,例如 4.4 与 4.5.

107.4. Language Integrated Query

val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
  $"col0" === "row005" ||
  $"col0" <= "row005")
  .select("col0", "col1", "col4")
s.show

DataFrame 可以做很多操作,例如 join, sort, select, filter, orderBy 等等等等。df.filter 通过指定的 SQL 表达式提供过滤器,select选择一系列的列:col0, col1col4

107.5. SQL 查询

df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show

registerTempTable 注册了一个名为 df 的 DataFrame 作为临时表,表名为table1,临时表的生命周期和 SQLContext 有关,用于创建dfsqlContext.sql函数允许用户执行 SQL 查询。

107.6. Others

例 36. 查询不同的时间戳

在 HBaseSparkConf 中,可以设置 4 个和时间戳有关的参数,它们分别表示为 TIMESTAMP, MIN_TIMESTAMP, MAX_TIMESTAMP 和 MAX_VERSIONS。用户可以使用不同的时间戳或者利用 MIN_TIMESTAMP 和 MAX_TIMESTAMP 查询时间范围内的记录。同时,下面的例子使用了具体的数值取代了 tsSpecified 和 oldMs。

下例展示了如何使用不同的时间戳加载 df DataFrame。tsSpecified 由用户定义,HBaseTableCatalog 定义了 HBase 和 Relation 关系的 schema。writeCatalog 定义了 schema 映射的目录。

val df = sqlContext.read
      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
      .format("org.apache.hadoop.hbase.spark")
      .load()

下例展示了如何使用不同的时间范围加载 df DataFrame。oldMs 由用户定义。

val df = sqlContext.read
      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
        HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
      .format("org.apache.hadoop.hbase.spark")
      .load()

在加载 DataFrame 之后,用户就可以查询数据。

df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show

例 37. 原生 Avro 支持

Example 37. Native Avro support

HBase-Spark Connector 支持不同类型的数据格式例如 Avro, Jason 等等。下面的用例展示了 Spark 如何支持 Avro。用户可以将 Avro 记录直接持久化进 HBase。 在内部,Avro schema 自动的转换为原生的 Spark Catalyst 数据类型。 注意,HBase 表中无论是键或者值的部分都可以在 Avro 格式定义。

  1. 为 schema 映射定义目录:
def catalog = s"""{
                     |"table":{"namespace":"default", "name":"Avrotable"},
                      |"rowkey":"key",
                      |"columns":{
                      |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                      |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
                      |}
                      |}""".stripMargin

catalog是一个 HBase 表的 schema,命名为 Avrotable。行键作为键,并且有一个列 col1。行键也被定义为详细的一列(col0),并且指定列族(rowkey)。

  1. 准备数据:
 object AvroHBaseRecord {
   val schemaString =
     s"""{"namespace": "example.avro",
         |   "type": "record",      "name": "User",
         |    "fields": [
         |        {"name": "name", "type": "string"},
         |        {"name": "favorite_number",  "type": ["int", "null"]},
         |        {"name": "favorite_color", "type": ["string", "null"]},
         |        {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
         |        {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
         |      ]    }""".stripMargin

   val avroSchema: Schema = {
     val p = new Schema.Parser
     p.parse(schemaString)
   }

   def apply(i: Int): AvroHBaseRecord = {
     val user = new GenericData.Record(avroSchema);
     user.put("name", s"name${"%03d".format(i)}")
     user.put("favorite_number", i)
     user.put("favorite_color", s"color${"%03d".format(i)}")
     val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
     favoriteArray.add(s"number${i}")
     favoriteArray.add(s"number${i+1}")
     user.put("favorite_array", favoriteArray)
     import collection.JavaConverters._
     val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
     user.put("favorite_map", favoriteMap)
     val avroByte = AvroSedes.serialize(user, avroSchema)
     AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
   }
 }

 val data = (0 to 255).map { i =>
    AvroHBaseRecord(i)
 }

首先定义 schemaString,然后它被解析来获取avroSchemaavroSchema是用来生成 AvroHBaseRecord的。data 由用户准备,是一个有 256 个AvroHBaseRecord对象的原生 Scala 集合。

  1. 保存 DataFrame:
 sc.parallelize(data).toDF.write.options(
     Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
     .format("org.apache.spark.sql.execution.datasources.hbase")
     .save()

对于由 schema catalog提供的已有的数据帧,上述语句将会创建一个具有 5 个分区的 HBase 表,并且将数据存进去。

  1. 加载 DataFrame
def avroCatalog = s"""{
            |"table":{"namespace":"default", "name":"avrotable"},
            |"rowkey":"key",
            |"columns":{
              |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
              |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
            |}
          |}""".stripMargin

 def withCatalog(cat: String): DataFrame = {
     sqlContext
         .read
         .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
         .format("org.apache.spark.sql.execution.datasources.hbase")
         .load()
 }
 val df = withCatalog(catalog)

withCatalog 函数中,read 会返回一个可以将数据读取成 DataFrame 格式的 DataFrameReader。 option 函数追加输入选项来指定 DataFrameReader 使用的底层数据源。这里有两个选项,一个是设置avroSchemaAvroHBaseRecord.schemaString,另一个是设置HBaseTableCatalog.tableCatalogavroCatalogload() 函数加载所有的数据为 DataFrame。数据帧 dfwithCatalog 函数返回,可用于访问 HBase 表中的数据。

  1. SQL 查询
 df.registerTempTable("avrotable")
 val c = sqlContext.sql("select count(1) from avrotable").

在加载 df DataFrame 之后,用户可以查询数据。registerTempTable 将 df DataFrame 注册为一个临时表,表名为 avrotable。 sqlContext.sql函数允许用户执行 SQL 查询。