以下是以只读方式使用 HBase 作为 MapReduce 源的示例。 具体来说,有一个 Mapper 实例但没有 Reducer,并且 Mapper 没有发出任何内容。 Job 将定义如下......
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
...
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
…并且 mapper 实例将继承 TableMapper…
public static class MyMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
// process data for the row from the Result instance.
}
}
以下 HBase 既作为源也作为 MapReduce 的接收器的示例。 此示例将简单地将数据从一个表复制到另一个表。
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
很有必要解释一下TableMapReduceUtil
的作用是什么,尤其是 reducer. TableOutputFormat 被用作 outputFormat class ,一些参数已经进行了配置,例如TableOutputFormat.OUTPUT_TABLE
,同时设置了 reducer 的 output key 为TableOutputFormat.OUTPUT_TABLE
并且 value 为Writable
. 这些配置项可以由开发工程师在 job 和配置文件中进行设置,TableMapReduceUtil
试图将这些工作进行简化.
下面是一个 mapper 的例子,它将创建一个"Put" ,匹配输入的"Result "并输出.而这些工作正是 CopyTable 工具的作用.
public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
// this example is just copying the data from the source table...
context.write(row, resultToPut(row,value));
}
private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
Put put = new Put(key.get());
for (KeyValue kv : result.raw()) {
put.add(kv);
}
return put;
}
}
这实际上并不是一个 reducer 过程, 所以由TableOutputFormat
负责将'Put'发送到目标表.
这只是一个例子,开发人员可以选择不使用TableOutputFormat
并自行链接到目标表.
TODO: MultiTableOutputFormat
样例.
以下示例使用 HBase 作为 MapReduce 源并接收汇总信息。此示例将计算表中某个 value 的不同实例的数量,并将这些汇总计数写入另一个表中。
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
在示例中的 mapper 在一个 String 类型的 value 上进行汇总操作,并将 value�作为 mapper 输出的 key,IntWritable
表示实例计数器。
public static class MyMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "attr1".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String val = new String(value.getValue(CF, ATTR1));
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
在 reducer 中,计算“ones”(就像执行此操作的任何其他 MR 示例一样),然后发出“Put”。
public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] COUNT = "count".getBytes();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(CF, COUNT, Bytes.toBytes(i));
context.write(null, put);
}
}
这与上面的汇总示例很相似,不同之处在于该汇总使用 HBase 作为 MapReduce 的数据源而使用 HDFS 作为接收器.这样的不同体现在 job 启动和 reduce 过程,而 mapper 过程没有区别.
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
job.setReducerClass(MyReducer.class); // reducer class
job.setNumReduceTasks(1); // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
如上所述,本示例的中的 mappper 与上例无异,至于 Reducer,则采用一个'通用'的而不是继承自 TableMapper 并且发出 Puts.
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
context.write(key, new IntWritable(i));
}
}
如果使用 HBase 作为 Reducer,也可以在没有 Reducer 的情况下进行汇总.
汇总任务要求 HBase 目标表存在.表方法incrementColumnValue
将被用作值的原子增长.从性能角度看,为每个 map-task 中那些会值增长的值保留一个 Map,并且在 mapper 执行cleanup
方法时每个 key 更新一次,这可能是有意义的.但是,您的里程可能会根据要处理的行数和惟一键的不同而有所不同。
最后,汇总结果在 HBase 中.
有时,为 RDBMS 生成摘要更合适。对于这些情况,可以通过自定义 reducer 直接生成 RDBMS 的摘要。 setup
方法可以连接到 RDBMS(连接信息可以通过上下文中的自定义参数传递),清理方法可以关闭连接。
一个 job 的 reducer 数量对汇总实现至关重要,您将必须将其设计到 reducer 中.具体来说,不管被设计成一个 reducer 还是多个 reducer,这没有对错之分,完全依赖于您的用例.指定给 job 的 reducer 越多,与 RDMS 建立的实时链接越多,这可以在一定程度上提高吞吐量.
public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private Connection c = null;
public void setup(Context context) {
// create DB connection...
}
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// do summarization
// in this example the keys are Text, but this is just an example
}
public void cleanup(Context context) {
// close db connection
}
}
最终,汇总结果写入到 RDMS 表中.