hbase指定地址,指定zookeeper地址即可,多个逗号相隔

conf.set("hbase.zookeeper.quorum","node05");


(1)把数据从一个表复制到另外一个表,不经历reducer

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
	sourceTable, // input table
	scan, // Scan instance to control CF and attribute selection
	MyMapper.class, // mapper class
	null, // mapper output key
	null, // mapper output value
	job);
	
TableMapReduceUtil.initTableReducerJob(
	targetTable, // output table
	null, // reducer class
	job);
	
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
	throw new IOException("error with job!");
}
mapper:

public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
	// this example is just copying the data from the source table...
		context.write(row, resultToPut(row,value));
	}
	private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
		Put put = new Put(key.get());
		for (KeyValue kv : result.raw()) {
			put.add(kv);
		}
		return put;
	}
}

(2)把数据从一个表经过mapreduce处理另外一个表,经历reducer

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
	sourceTable, // input table
	scan, // Scan instance to control CF and attribute selection
	MyMapper.class, // mapper class
	Text.class, // mapper output key
	IntWritable.class, // mapper output value
	job);
TableMapReduceUtil.initTableReducerJob(
	targetTable, // output table
	MyTableReducer.class, // reducer class
	job);
	
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
	throw new IOException("error with job!");
}

mapper:

public static class MyMapper extends TableMapper<Text, IntWritable> {

	public static final byte[] CF = "cf".getBytes();
	public static final byte[] ATTR1 = "attr1".getBytes();
	private final IntWritable ONE = new IntWritable(1);
	private Text text = new Text();
	
	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
		String val = new String(value.getValue(CF, ATTR1));
		text.set(val); // we can only emit Writables...
		context.write(text, ONE);
	}
}

reducer:

public static class MyTableReducer extends TableReducer<Text, IntWritable,ImmutableBytesWritable> {
	
	public static final byte[] CF = "cf".getBytes();
	public static final byte[] COUNT = "count".getBytes();
	
	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int i = 0;
		for (IntWritable val : values) {
			i += val.get();
		}
		Put put = new Put(Bytes.toBytes(key.toString()));
		put.add(CF, COUNT, Bytes.toBytes(i));
		context.write(null, put);
	}
}

(3)把数据从一个表经过mapreduce处理,结果写入文件

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
	sourceTable, // input table
	scan, // Scan instance to control CF and attribute selection
	MyMapper.class, // mapper class
	Text.class, // mapper output key
	IntWritable.class, // mapper output value
	job);
	
job.setReducerClass(MyReducer.class); // reducer class
job.setNumReduceTasks(1); // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required

boolean b = job.waitForCompletion(true);
if (!b) {
	throw new IOException("error with job!");
}

mapper:

public static class MyMapper extends TableMapper<Text, IntWritable> {

	public static final byte[] CF = "cf".getBytes();
	public static final byte[] ATTR1 = "attr1".getBytes();
	private final IntWritable ONE = new IntWritable(1);
	private Text text = new Text();
	
	public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
		String val = new String(value.getValue(CF, ATTR1));
		text.set(val); // we can only emit Writables...
		context.write(text, ONE);
	}
}

reducer:

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int i = 0;
		for (IntWritable val : values) {
			i += val.get();
		}
		context.write(key, new IntWritable(i));
}





实例:

 Job: 统计每个用户每个月主叫的通话次数

package mr.phone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

/**
 * 统计每个用户每个月主叫的通话次数
 * 从 hbase表 -> mapreduce -> hbase表
 * Created by Jeffrey.Deng on 2017/8/18.
 */
public class PhoneLogJob {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(true);
        conf.set("mapreduce.framework.name", "local");
        conf.set("mapreduce.app-submission.cross-platform", "true");
        conf.set("hbase.zookeeper.quorum","node05");    // hbase zookeeper node
        Job job = Job.getInstance(conf);
        job.setJarByClass(PhoneLogJob.class);
        job.setJobName("Analysis user call log");

        Scan scan = new Scan();
        scan.setCaching(100);   // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don't set to true for MR jobs
        scan.addColumn("cf".getBytes(), "hostNum".getBytes());
        scan.addColumn("cf".getBytes(), "date".getBytes());
        scan.addColumn("cf".getBytes(), "type".getBytes());

        // 本地模式 addDependencyJars 设置false
        TableMapReduceUtil.initTableMapperJob(
                "phone",              // input HBase table name
                scan,                        // Scan instance to control CF and attribute selection
                AlysCallLogMapper.class,    // mapper class
                Text.class,                 // mapper output key
                IntWritable.class,          // mapper output value
                job,
                false);

        // TableMapReduceUtil.initTableReducerJob("user_call_monthcount", AlysCallLogReducer.class, job);
        // 本地模式 addDependencyJars 设置false
        TableMapReduceUtil.initTableReducerJob(
                "user_call_monthcount",  // 输出表
                AlysCallLogReducer.class, job,  // reducer.class
                null,  // partitioner.class
                null,  // zookeeper address
                null, null,
                false // 是否需要复制hbase jar包到 cache 中
        );
        job.setNumReduceTasks(3);

        boolean result = job.waitForCompletion(true);
        if (!result) {
            throw new IOException("error with job!");
        }
    }

}
mapper:

package mr.phone;

import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * Created by Jeffrey.Deng on 2017/8/18.
 */
public class AlysCallLogMapper extends TableMapper<Text, IntWritable> {

    private final static byte[] CF_TBL = "cf".getBytes();
    private final static byte[] TYPE = "type".getBytes();
    private final static byte[] DATE = "date".getBytes();

    private static IntWritable one = new IntWritable(1);
    private Text outKey = new Text();

    @Override
    protected void map(ImmutableBytesWritable key, Result rs, Context context) throws IOException, InterruptedException {
        String type = new String( CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, TYPE)) );
        // 1 为主叫
        if (type.equals("1")) {
            String rowkey = new String(key.get());
            String hostnum = rowkey.substring(0, rowkey.indexOf('_'));
            // 201711
            String ym = new String( CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, DATE)) ).substring(0, 6);
            outKey.set(hostnum + "_" + ym);
            // 15090627907_201711 1
            context.write(outKey, one);
        } else {
            return;
        }
    }

}
reducer:
package mr.phone;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * Created by Jeffrey.Deng on 2017/8/18.
 */
public class AlysCallLogReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {

    private final static byte[] CF_TBL = "cf".getBytes();
    private final static byte[] COUNT = "count".getBytes();
    private final static byte[] DATE = "date".getBytes();
    private final static byte[] HOSTNUM = "hostnum".getBytes();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable i : values) {
            sum += i.get();
        }
        // put 15090627907_201711 15090627907 201711 5
        Put put = new Put(key.toString().getBytes());
        int index = key.toString().indexOf('_');
        String num = key.toString().substring(0, index);
        String ym = key.toString().substring(index + 1);
        put.add(CF_TBL, HOSTNUM, num.getBytes());
        put.add(CF_TBL, DATE, ym.getBytes());
        put.add(CF_TBL, COUNT, (sum + "").getBytes());
        context.write(null, put);
    }
}


Job:将统计结果 user_call_monthcount 复制到本地文件

package mr.phone;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 将统计结果 user_call_monthcount 复制到本地文件
 * 从 hbase表 -> map -> file
 * Created by Jeffrey.Deng on 2017/8/18.
 */
public class ResultToFileJob {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(true);
        conf.set("mapreduce.framework.name", "local");
        conf.set("mapreduce.app-submission.cross-platform", "true");
        conf.set("hbase.zookeeper.quorum","node05");
        Job job = Job.getInstance(conf);
        job.setJarByClass(ResultToFileJob.class);
        job.setJobName("Copy Analysis user call log result to file");

        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        scan.setCaching(12);
        scan.addColumn("cf".getBytes(), "hostnum".getBytes());
        scan.addColumn("cf".getBytes(), "date".getBytes());
        scan.addColumn("cf".getBytes(), "count".getBytes());

        TableMapReduceUtil.initTableMapperJob("user_call_monthcount", scan, CopyMapper.class, Text.class, NullWritable.class, job);

        //本地文件路径
        Path p = new Path("file:///D:/User/Desktop/result/");
        FileOutputFormat.setOutputPath(job, p);

        //job.setSortComparatorClass(ResultSortComparator.class);

        boolean result = job.waitForCompletion(true);
        if (!result) {
            throw new IOException("error with job!");
        }
    }

    static class CopyMapper extends TableMapper<Text, NullWritable> {

        private final static byte[] CF_TBL = "cf".getBytes();
        private final static byte[] COUNT = "count".getBytes();
        private final static byte[] DATE = "date".getBytes();
        private final static byte[] HOSTNUM = "hostnum".getBytes();

        private Text outKey = new Text();

        @Override
        protected void map(ImmutableBytesWritable key, Result rs, Context context) throws IOException, InterruptedException {
            String hostnum = new String(CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, HOSTNUM)));
            String date = new String(CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, DATE)));
            String count = new String(CellUtil.cloneValue(rs.getColumnLatestCell(CF_TBL, COUNT)));
            outKey.set(hostnum + "\t" + date + "\t" + count);
            context.write(outKey, NullWritable.get());
        }
    }

    static class ResultSortComparator extends WritableComparator {
        ResultSortComparator() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            String x = a.toString().substring(a.toString().indexOf('_'));
            String y = b.toString().substring(a.toString().indexOf('_'));
            return x.compareTo(y);
        }
    }
}


添加新评论