HBase 复习 ---- chapter06

需求:读取 HBase 中的 t_log 表中 f:userid 的值。然后将 f:userid 的值。出现的次数统计出来。输出到 HDFS 上。 数据从 HBase 表中读取,所以 Mapper 类必须继承 TableMapper 数据最终写到 HDFS,所以 Reduce 类必须继承 Reducer

HBase 作为我们的数据源。我们如何获取数据? 获取数据 == 查询数据 == scan(查询所有)+ get(查询某一行)

Scan scan = new Scan();

//默认情况下,一条一条的查询,效率不高,所以我们向设置缓存,批量查询。

//这个时候我们就要设置我们 Scan 属性

//设置缓存,一次读取 10 行键数据

scan.setCaching(10);

选择扫描仪缓存的优先顺序如下:三种方式 方式一:在扫描对象上设置的缓存设置。

scan.setCaching(10);

方式二:

TableMapReduceUtil.setScannerCaching(10);

方式三: 修改默认值 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING,设置为 100。

//设置要读取的列 f:userid

scan.addColumn("f".getBytes(), "userid".getBytes());

//设置 scan 指定的列族

public Scan addFamily(byte[] family) {}

//设置 scan 指定的列(列族:列限定符)

public Scan addColumn(byte[] family, byte[] qualifier) {}

//设置回显的版本数。我们 0.96 之后,VERSIONS = 1

public Scan readVersions(int versions) {}

//要将每次调用返回的最大数量限制(limit)

public Scan setBatch(int batch) {} == scan 'student',{LIMIT=>1}

//添加过滤器 == mysql 里面的where

public Scan setFilter(Filter filter) {}

//scan 'student',{LIMIT=>1}

public Scan setLimit(int limit) {}

需要一个工具类帮助我们实现从 HBase 表中获取数据

TableMapReduceUtil.initTableMapperJob(

"t_log".getBytes(),//指定表名

scan,//获取数据的封装类型限制

HbaseToHDFSMapper.class, //指定我们自定义的mapper类,必须继承TableMapper

Text.class,//mapper的输出数据的key的数据类型

IntWritable.class, //mapper的输出数据的value的数据类型

job, //传入当前job

true//是否添加相关依赖

);

这里面有个重要的地方!!! 我们以前使用 HDFS 作为数据源,我们自定义的 Mapper 必须继承 Mapper 类。 默认使用的 TextInputFormat: 1:每一行转换成一个 KV 对。key:字节的偏移量(LongWritable);value:是当前行的内容(Text)。 2:我们的分片如何分呢?分片的大小 == block的大小 = 128。

我们现在使用 HBase 作为数据源,我们自定义的 Mapper 必须继承 TableMapper 类。 默认使用的 TableInputFormat: 1:每一行键数据转换成一个 KV 对。key:RowKey(ImmutableBytesWritable);value:是当前行键对应的 cell 集合(Result) 2:我们的分片如何分呢?我们一般认为一个 region 对应一个 mapTask

//假设: 数据最终写到Hbase(数据接收器),,所以Reduce类必须继承TableReducer.

public static void initTableReducerJob(String table, Class reducer, Job job) {}

TableMapReduceUtil.initTableReducerJob(

"t_out_log".getBytes(),//指定表名

HbaseToHDFSReducer.class, //指定我们自定义的Reducer类,必须继承TableReducer

job, //传入当前job

);

编写 Mapper 类

//1:继承TableMapper类,重写map方法

public class HbaseToHDFSMapper extends TableMapper {

// 重写map方法: map(ImmutableBytesWritable key, Result value, Context context)有三个参数

// ImmutableBytesWritable key:这是一个序列化对象,可以转换成string数据类型,

// 这里面是什么呢?是行键,只是行键是字节数组的方式进行存储的

// Result value:Result对象封装了相同行键的cell数据。

// Context context:是上下文路径,作为缓存

@Override

protected void map(ImmutableBytesWritable key, Result value, Context context)

throws IOException, InterruptedException {

// 当前行键数据中是否包含f:userid 的cell的值?如果有,返回true,无,返回false

boolean isContainsColumn = value.containsColumn(familyName, columnName);

if (isContainsColumn) {

// 获取f:userid对应的单元格cell信息

//返回的是f:userid列的所有版本的cell.在0.96之后,VERSIONS=1.

List listCells = value.getColumnCells(familyName, columnName);

//获取最新版本的cell.

Cell cell = listCells.get(0);

// 获取当前cell的值

byte[] cloneValue = CellUtil.cloneValue(cell);

String useridValue = Bytes.toString(cloneValue);

// 讲我们获取的当前cell对应的值作为key输出

outKey.set(useridValue);

// value:当前cell对应的值出现的次数

context.write(outKey, outValue);

}

}

}

编写 Reduce 类

//这个就是一个普通的reducer。最后将我们生成的数据输出到hdfs或者本地

public class HbaseToHDFSReducer extends Reducer {}

编写 Driver 驱动类

// 因为hbase整合MapReduce。所以我们的配置文件的获取方式HBaseConfiguration.create();

// conf:core-site.xml hbase-site.xml mapred-site.xml yarn-site.xml

Configuration conf = HBaseConfiguration.create();

conf.set("fs.defaultFS", "hdfs://192.168.56.104:9000/");

conf.set("hbase.zookeeper.quorum", "192.168.56.104");

// 获取FileSystem对象,方便我们操作hdfs

FileSystem fs = FileSystem.get(conf);

// 获取job对象,因为传递是conf是HBaseConfiguration.create()获取的,所以job中封装了hbase的基础信息

Job job = Job.getInstance(conf);

// 设置驱动 mapper reducer三个关联.但是Mapper不一样了,Mapper继承的TableMapper.

// 设置驱动关联

job.setJarByClass(ReadHbaseDataToHDFS.class);

// 我们要从hbase中读取数据。hbase中的数据如何进行封装呢?通过Scan对象,Scan对象可以一次读取多行键数据

Scan scan = new Scan();

// 设置缓存,一次读取10行键数据

scan.setCaching(10);

// 设置要读取的列 f:userid

scan.addColumn("f".getBytes(), "userid".getBytes());

// TableMapReduceUtil:这是一个工具类,这个工具类就是可以设置mapper的输入数据类型。TableInputFormat

// 我们以前默认的inputFormat的实现类是? TextInputFormat

// TableInputFormat:他的key:ImmutableBytesWritable key,

// value: Result value

TableMapReduceUtil.initTableMapperJob(

"t_log".getBytes(),//指定表名

scan,//获取数据的封装类型限制

HbaseToHDFSMapper.class, //指定我们自定义的mapper类,必须继承TableMapper

Text.class,//mapper的输出数据的key的数据类型

IntWritable.class, //mapper的输出数据的value的数据类型

job, //传入当前job

true//是否添加相关依赖

);

// 设置reducer的关联

job.setReducerClass(HbaseToHDFSReducer.class);

// 设置最终输出数据的kv的数据类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

// 指定我们最终数据要存储的位置

Path outputPath = new Path("hdfs://niit-master:9000/user/niit/output/");

FileOutputFormat.setOutputPath(job, outputPath);

// 先进行判断输出路径是否存在,存在就执行删除操作,确保我们的输出路径不存在

if (fs.exists(outputPath)) {

fs.delete(outputPath, true);

}

// 最后提交job

boolean isDone = job.waitForCompletion(true);

return isDone ? 0 : 1;

}

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: