hbase指定地址,指定zookeeper地址即可,多个逗号相隔
conf.set("hbase.zookeeper.quorum","node05");
(1)把数据从一个表复制到另外一个表,不经历reducer
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
mapper:public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
// this example is just copying the data from the source table...
context.write(row, resultToPut(row,value));
}
private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
Put put = new Put(key.get());
for (KeyValue kv : result.raw()) {
put.add(kv);
}
return put;
}
}
(2)把数据从一个表经过mapreduce处理另外一个表,经历reducer
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
mapper:
public static class MyMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "attr1".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String val = new String(value.getValue(CF, ATTR1));
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
reducer:
public static class MyTableReducer extends TableReducer<Text, IntWritable,ImmutableBytesWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] COUNT = "count".getBytes();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(CF, COUNT, Bytes.toBytes(i));
context.write(null, put);
}
}
(3)把数据从一个表经过mapreduce处理,结果写入文件
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
job.setReducerClass(MyReducer.class); // reducer class
job.setNumReduceTasks(1); // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
mapper:
public static class MyMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "attr1".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String val = new String(value.getValue(CF, ATTR1));
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
reducer:
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
context.write(key, new IntWritable(i));
}
实例:
Job: 统计每个用户每个月主叫的通话次数
package mr.phone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
/**
* 统计每个用户每个月主叫的通话次数
* 从 hbase表 -> mapreduce -> hbase表
* Created by Jeffrey.Deng on 2017/8/18.
*/
public class PhoneLogJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration(true);
conf.set("mapreduce.framework.name", "local");
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("hbase.zookeeper.quorum","node05"); // hbase zookeeper node
Job job = Job.getInstance(conf);
job.setJarByClass(PhoneLogJob.class);
job.setJobName("Analysis user call log");
Scan scan = new Scan();
scan.setCaching(100); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addColumn("cf".getBytes(), "hostNum".getBytes());
scan.addColumn("cf".getBytes(), "date".getBytes());
scan.addColumn("cf".getBytes(), "type".getBytes());
// 本地模式 addDependencyJars 设置false
TableMapReduceUtil.initTableMapperJob(
"phone", // input HBase table name
scan, // Scan instance to control CF and attribute selection
AlysCallLogMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job,
false);
// TableMapReduceUtil.initTableReducerJob("user_call_monthcount", AlysCallLogReducer.class, job);
// 本地模式 addDependencyJars 设置false
TableMapReduceUtil.initTableReducerJob(
"user_call_monthcount", // 输出表
AlysCallLogReducer.class, job, // reducer.class
null, // partitioner.class
null, // zookeeper address
null, null,
false // 是否需要复制hbase jar包到 cache 中
);
job.setNumReduceTasks(3);
boolean result = job.waitForCompletion(true);
if (!result) {
throw new IOException("error with job!");
}
}
}
mapper:package mr.phone;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* Created by Jeffrey.Deng on 2017/8/18.
*/
public class AlysCallLogMapper extends TableMapper<Text, IntWritable> {
private final static byte[] CF_TBL = "cf".getBytes();
private final static byte[] TYPE = "type".getBytes();
private final static byte[] DATE = "date".getBytes();
private static IntWritable one = new IntWritable(1);
private Text outKey = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result rs, Context context) throws IOException, InterruptedException {
String type = new String( CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, TYPE)) );
// 1 为主叫
if (type.equals("1")) {
String rowkey = new String(key.get());
String hostnum = rowkey.substring(0, rowkey.indexOf('_'));
// 201711
String ym = new String( CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, DATE)) ).substring(0, 6);
outKey.set(hostnum + "_" + ym);
// 15090627907_201711 1
context.write(outKey, one);
} else {
return;
}
}
}
reducer:package mr.phone;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* Created by Jeffrey.Deng on 2017/8/18.
*/
public class AlysCallLogReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
private final static byte[] CF_TBL = "cf".getBytes();
private final static byte[] COUNT = "count".getBytes();
private final static byte[] DATE = "date".getBytes();
private final static byte[] HOSTNUM = "hostnum".getBytes();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
// put 15090627907_201711 15090627907 201711 5
Put put = new Put(key.toString().getBytes());
int index = key.toString().indexOf('_');
String num = key.toString().substring(0, index);
String ym = key.toString().substring(index + 1);
put.add(CF_TBL, HOSTNUM, num.getBytes());
put.add(CF_TBL, DATE, ym.getBytes());
put.add(CF_TBL, COUNT, (sum + "").getBytes());
context.write(null, put);
}
}
Job:将统计结果 user_call_monthcount 复制到本地文件
package mr.phone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 将统计结果 user_call_monthcount 复制到本地文件
* 从 hbase表 -> map -> file
* Created by Jeffrey.Deng on 2017/8/18.
*/
public class ResultToFileJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration(true);
conf.set("mapreduce.framework.name", "local");
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("hbase.zookeeper.quorum","node05");
Job job = Job.getInstance(conf);
job.setJarByClass(ResultToFileJob.class);
job.setJobName("Copy Analysis user call log result to file");
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(12);
scan.addColumn("cf".getBytes(), "hostnum".getBytes());
scan.addColumn("cf".getBytes(), "date".getBytes());
scan.addColumn("cf".getBytes(), "count".getBytes());
TableMapReduceUtil.initTableMapperJob("user_call_monthcount", scan, CopyMapper.class, Text.class, NullWritable.class, job);
//本地文件路径
Path p = new Path("file:///D:/User/Desktop/result/");
FileOutputFormat.setOutputPath(job, p);
//job.setSortComparatorClass(ResultSortComparator.class);
boolean result = job.waitForCompletion(true);
if (!result) {
throw new IOException("error with job!");
}
}
static class CopyMapper extends TableMapper<Text, NullWritable> {
private final static byte[] CF_TBL = "cf".getBytes();
private final static byte[] COUNT = "count".getBytes();
private final static byte[] DATE = "date".getBytes();
private final static byte[] HOSTNUM = "hostnum".getBytes();
private Text outKey = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result rs, Context context) throws IOException, InterruptedException {
String hostnum = new String(CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, HOSTNUM)));
String date = new String(CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, DATE)));
String count = new String(CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, COUNT)));
outKey.set(hostnum + "\t" + date + "\t" + count);
context.write(outKey, NullWritable.get());
}
}
static class ResultSortComparator extends WritableComparator {
ResultSortComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
String x = a.toString().substring(a.toString().indexOf('_'));
String y = b.toString().substring(a.toString().indexOf('_'));
return x.compareTo(y);
}
}
}