HBase 复习 ---- chapter06
需求:读取 HBase 中的 t_log 表中 f:userid 的值。然后将 f:userid 的值。出现的次数统计出来。输出到 HDFS 上。 数据从 HBase 表中读取,所以 Mapper 类必须继承 TableMapper 数据最终写到 HDFS,所以 Reduce 类必须继承 Reducer
HBase 作为我们的数据源。我们如何获取数据? 获取数据 == 查询数据 == scan(查询所有)+ get(查询某一行)
Scan scan = new Scan();
//默认情况下,一条一条的查询,效率不高,所以我们向设置缓存,批量查询。
//这个时候我们就要设置我们 Scan 属性
//设置缓存,一次读取 10 行键数据
scan.setCaching(10);
选择扫描仪缓存的优先顺序如下:三种方式 方式一:在扫描对象上设置的缓存设置。
scan.setCaching(10);
方式二:
TableMapReduceUtil.setScannerCaching(10);
方式三: 修改默认值 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING,设置为 100。
//设置要读取的列 f:userid
scan.addColumn("f".getBytes(), "userid".getBytes());
//设置 scan 指定的列族
public Scan addFamily(byte[] family) {}
//设置 scan 指定的列(列族:列限定符)
public Scan addColumn(byte[] family, byte[] qualifier) {}
//设置回显的版本数。我们 0.96 之后,VERSIONS = 1
public Scan readVersions(int versions) {}
//要将每次调用返回的最大数量限制(limit)
public Scan setBatch(int batch) {} == scan 'student',{LIMIT=>1}
//添加过滤器 == mysql 里面的where
public Scan setFilter(Filter filter) {}
//scan 'student',{LIMIT=>1}
public Scan setLimit(int limit) {}
需要一个工具类帮助我们实现从 HBase 表中获取数据
TableMapReduceUtil.initTableMapperJob(
"t_log".getBytes(),//指定表名
scan,//获取数据的封装类型限制
HbaseToHDFSMapper.class, //指定我们自定义的mapper类,必须继承TableMapper
Text.class,//mapper的输出数据的key的数据类型
IntWritable.class, //mapper的输出数据的value的数据类型
job, //传入当前job
true//是否添加相关依赖
);
这里面有个重要的地方!!! 我们以前使用 HDFS 作为数据源,我们自定义的 Mapper 必须继承 Mapper 类。 默认使用的 TextInputFormat: 1:每一行转换成一个 KV 对。key:字节的偏移量(LongWritable);value:是当前行的内容(Text)。 2:我们的分片如何分呢?分片的大小 == block的大小 = 128。
我们现在使用 HBase 作为数据源,我们自定义的 Mapper 必须继承 TableMapper 类。 默认使用的 TableInputFormat: 1:每一行键数据转换成一个 KV 对。key:RowKey(ImmutableBytesWritable);value:是当前行键对应的 cell 集合(Result) 2:我们的分片如何分呢?我们一般认为一个 region 对应一个 mapTask
//假设: 数据最终写到Hbase(数据接收器),,所以Reduce类必须继承TableReducer.
public static void initTableReducerJob(String table, Class extends TableReducer> reducer, Job job) {}
TableMapReduceUtil.initTableReducerJob(
"t_out_log".getBytes(),//指定表名
HbaseToHDFSReducer.class, //指定我们自定义的Reducer类,必须继承TableReducer
job, //传入当前job
);
编写 Mapper 类
//1:继承TableMapper类,重写map方法
public class HbaseToHDFSMapper extends TableMapper
// 重写map方法: map(ImmutableBytesWritable key, Result value, Context context)有三个参数
// ImmutableBytesWritable key:这是一个序列化对象,可以转换成string数据类型,
// 这里面是什么呢?是行键,只是行键是字节数组的方式进行存储的
// Result value:Result对象封装了相同行键的cell数据。
// Context context:是上下文路径,作为缓存
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
// 当前行键数据中是否包含f:userid 的cell的值?如果有,返回true,无,返回false
boolean isContainsColumn = value.containsColumn(familyName, columnName);
if (isContainsColumn) {
// 获取f:userid对应的单元格cell信息
//返回的是f:userid列的所有版本的cell.在0.96之后,VERSIONS=1.
List
//获取最新版本的cell.
Cell cell = listCells.get(0);
// 获取当前cell的值
byte[] cloneValue = CellUtil.cloneValue(cell);
String useridValue = Bytes.toString(cloneValue);
// 讲我们获取的当前cell对应的值作为key输出
outKey.set(useridValue);
// value:当前cell对应的值出现的次数
context.write(outKey, outValue);
}
}
}
编写 Reduce 类
//这个就是一个普通的reducer。最后将我们生成的数据输出到hdfs或者本地
public class HbaseToHDFSReducer extends Reducer
编写 Driver 驱动类
// 因为hbase整合MapReduce。所以我们的配置文件的获取方式HBaseConfiguration.create();
// conf:core-site.xml hbase-site.xml mapred-site.xml yarn-site.xml
Configuration conf = HBaseConfiguration.create();
conf.set("fs.defaultFS", "hdfs://192.168.56.104:9000/");
conf.set("hbase.zookeeper.quorum", "192.168.56.104");
// 获取FileSystem对象,方便我们操作hdfs
FileSystem fs = FileSystem.get(conf);
// 获取job对象,因为传递是conf是HBaseConfiguration.create()获取的,所以job中封装了hbase的基础信息
Job job = Job.getInstance(conf);
// 设置驱动 mapper reducer三个关联.但是Mapper不一样了,Mapper继承的TableMapper.
// 设置驱动关联
job.setJarByClass(ReadHbaseDataToHDFS.class);
// 我们要从hbase中读取数据。hbase中的数据如何进行封装呢?通过Scan对象,Scan对象可以一次读取多行键数据
Scan scan = new Scan();
// 设置缓存,一次读取10行键数据
scan.setCaching(10);
// 设置要读取的列 f:userid
scan.addColumn("f".getBytes(), "userid".getBytes());
// TableMapReduceUtil:这是一个工具类,这个工具类就是可以设置mapper的输入数据类型。TableInputFormat
// 我们以前默认的inputFormat的实现类是? TextInputFormat
// TableInputFormat:他的key:ImmutableBytesWritable key,
// value: Result value
TableMapReduceUtil.initTableMapperJob(
"t_log".getBytes(),//指定表名
scan,//获取数据的封装类型限制
HbaseToHDFSMapper.class, //指定我们自定义的mapper类,必须继承TableMapper
Text.class,//mapper的输出数据的key的数据类型
IntWritable.class, //mapper的输出数据的value的数据类型
job, //传入当前job
true//是否添加相关依赖
);
// 设置reducer的关联
job.setReducerClass(HbaseToHDFSReducer.class);
// 设置最终输出数据的kv的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定我们最终数据要存储的位置
Path outputPath = new Path("hdfs://niit-master:9000/user/niit/output/");
FileOutputFormat.setOutputPath(job, outputPath);
// 先进行判断输出路径是否存在,存在就执行删除操作,确保我们的输出路径不存在
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// 最后提交job
boolean isDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
参考链接
发表评论