55.1. HBase MapReduce 读示例

以下是以只读方式使用 HBase 作为 MapReduce 源的示例。 具体来说,有一个 Mapper 实例但没有 Reducer,并且 Mapper 没有发出任何内容。 Job 将定义如下......

Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class);     // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs
...

TableMapReduceUtil.initTableMapperJob(
  tableName,        // input HBase table name
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper
  null,             // mapper output key
  null,             // mapper output value
  job);
job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

…​并且 mapper 实例将继承 TableMapper…​

public static class MyMapper extends TableMapper<Text, Text> {

  public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
    // process data for the row from the Result instance.
   }
}

55.2. HBase MapReduce 读/写示例

以下 HBase 既作为源也作为 MapReduce 的接收器的示例。 此示例将简单地将数据从一个表复制到另一个表。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,      // input table
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper class
  null,             // mapper output key
  null,             // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,      // output table
  null,             // reducer class
  job);
job.setNumReduceTasks(0);

boolean b = job.waitForCompletion(true);
if (!b) {
    throw new IOException("error with job!");
}

很有必要解释一下TableMapReduceUtil的作用是什么,尤其是 reducer. TableOutputFormat 被用作 outputFormat class ,一些参数已经进行了配置,例如TableOutputFormat.OUTPUT_TABLE,同时设置了 reducer 的 output key 为TableOutputFormat.OUTPUT_TABLE 并且 value 为Writable. 这些配置项可以由开发工程师在 job 和配置文件中进行设置,TableMapReduceUtil试图将这些工作进行简化.

下面是一个 mapper 的例子,它将创建一个"Put" ,匹配输入的"Result "并输出.而这些工作正是 CopyTable 工具的作用.

public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>  {

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    // this example is just copying the data from the source table...
      context.write(row, resultToPut(row,value));
    }

    private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
      Put put = new Put(key.get());
      for (KeyValue kv : result.raw()) {
        put.add(kv);
      }
      return put;
    }
}

这实际上并不是一个 reducer 过程, 所以由TableOutputFormat 负责将'Put'发送到目标表. 这只是一个例子,开发人员可以选择不使用TableOutputFormat并自行链接到目标表.

55.3. HBase MapReduce 多表输出的读写示例

TODO: MultiTableOutputFormat 样例.

55.4. HBase MapReduce 汇总到 HBase 示例

以下示例使用 HBase 作为 MapReduce 源并接收汇总信息。此示例将计算表中某个 value 的不同实例的数量,并将这些汇总计数写入另一个表中。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,        // output table
  MyTableReducer.class,    // reducer class
  job);
job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

在示例中的 mapper 在一个 String 类型的 value 上进行汇总操作,并将 value�作为 mapper 输出的 key,IntWritable表示实例计数器。

public static class MyMapper extends TableMapper<Text, IntWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] ATTR1 = "attr1".getBytes();

  private final IntWritable ONE = new IntWritable(1);
  private Text text = new Text();

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    String val = new String(value.getValue(CF, ATTR1));
    text.set(val);     // we can only emit Writables...
    context.write(text, ONE);
  }
}

在 reducer 中,计算“ones”(就像执行此操作的任何其他 MR 示例一样),然后发出“Put”。

public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] COUNT = "count".getBytes();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.add(CF, COUNT, Bytes.toBytes(i));

    context.write(null, put);
  }
}

55.5. HBase MapReduce 文件汇总示例

这与上面的汇总示例很相似,不同之处在于该汇总使用 HBase 作为 MapReduce 的数据源而使用 HDFS 作为接收器.这样的不同体现在 job 启动和 reduce 过程,而 mapper 过程没有区别.

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
job.setReducerClass(MyReducer.class);    // reducer class
job.setNumReduceTasks(1);    // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile"));  // adjust directories as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

如上所述,本示例的中的 mappper 与上例无异,至于 Reducer,则采用一个'通用'的而不是继承自 TableMapper 并且发出 Puts.

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    context.write(key, new IntWritable(i));
  }
}

55.6 不使用 Reducer ,HBase MapReduce 汇总到 HBase

如果使用 HBase 作为 Reducer,也可以在没有 Reducer 的情况下进行汇总. 汇总任务要求 HBase 目标表存在.表方法incrementColumnValue将被用作值的原子增长.从性能角度看,为每个 map-task 中那些会值增长的值保留一个 Map,并且在 mapper 执行cleanup 方法时每个 key 更新一次,这可能是有意义的.但是,您的里程可能会根据要处理的行数和惟一键的不同而有所不同。

最后,汇总结果在 HBase 中.

55.7. HBase MapReduce 汇总到 RDBMS

有时,为 RDBMS 生成摘要更合适。对于这些情况,可以通过自定义 reducer 直接生成 RDBMS 的摘要。 setup方法可以连接到 RDBMS(连接信息可以通过上下文中的自定义参数传递),清理方法可以关闭连接。

一个 job 的 reducer 数量对汇总实现至关重要,您将必须将其设计到 reducer 中.具体来说,不管被设计成一个 reducer 还是多个 reducer,这没有对错之分,完全依赖于您的用例.指定给 job 的 reducer 越多,与 RDMS 建立的实时链接越多,这可以在一定程度上提高吞吐量.

public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

  private Connection c = null;

  public void setup(Context context) {
    // create DB connection...
  }

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    // do summarization
    // in this example the keys are Text, but this is just an example
  }

  public void cleanup(Context context) {
    // close db connection
  }

}

最终,汇总结果写入到 RDMS 表中.