1.构建ReadFruitMapper类,用于读取 fruit 表中的数据

package test;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

//读取HBase中的表数据,写入Reducer

public class ReadFruitMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {

context.write(key,value);

}

}

2.构建WriteFruitMRReducer类,用于将读取到的 fruit 表中的数据写入到 fruit1 表中、

package test;

import org.apache.hadoop.hbase.client.Mutation;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//将Reducer中的数据写入HBase的表中

public class WriteFruitMRReducer extends TableReducer {

@Override

protected void reduce(LongWritable key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {

//1.遍历values

for (Text value : values) {

//1.1获取每行数据

String[] fields = value.toString().split("\t");

//1.2构建put对象

Put put = new Put(Bytes.toBytes(fields[0]));

//1.3给put对象赋值

put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));

put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));

//1.4写出数据

context.write(NullWritable.get(),put);

}

}

}

3.构建FruitMRDriver类,用于提交运行任务

package test;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class FruitMRDriver implements Tool {

//定义一个Configuration

private Configuration configuration = null;

@Override

public int run(String[] args) throws Exception {

//1.获取Job对象

Job job = Job.getInstance(configuration);

//2.设置驱动类路径

job.setJarByClass(FruitMRDriver.class);

//3.设置Mapper&Mapper输出的KV类型

job.setMapperClass(ReadFruitMapper.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(Text.class);

//4.设置Reducer类

TableMapReduceUtil.initTableReducerJob(args[1],WriteFruitMRReducer.class,job);

//5.设置最终的输出数据的KV类型

//6.设置输入参数

FileInputFormat.setInputPaths(job,new Path(args[0]));

//6.提交任务

boolean result = job.waitForCompletion(true);

return result?0:1;

}

@Override

public void setConf(Configuration conf) {

configuration = conf;

}

@Override

public Configuration getConf() {

return configuration;

}

public static void main(String[] args) {

try {

Configuration configuration = new Configuration();

int run = ToolRunner.run(configuration, new FruitMRDriver(), args);

System.exit(run);

} catch (Exception e) {

e.printStackTrace();

}

}

}

4.将FruitMRDriver类打成jar包,上传至HBase。

5.新建一个表(fruit1)

6.利用jar包运行任务

yarn jar jar/hbase01-1.0-SNAPSHOT.jar test.FruitMRDriver /hbase/hbasetest/bigdatafile/input/fruit.tsv fruit1

7.运行成功后,查看fruit1表中的数据是否传入成功!!!

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: