文章目录

08:离线分析:Hbase表设计及构建09:离线分析:Kafka消费者构建10:离线分析:Hbase连接构建11:离线分析:Rowkey的构建12:离线分析:Put数据列构建13:离线分析:存储运行测试14:离线分析:Hive关联测试15:离线分析:Phoenix关联测试

08:离线分析:Hbase表设计及构建

目标:掌握Hbase表的设计及创建表的实现 路径

step1:基础设计step2:Rowkey设计step3:分区设计step4:建表 实施

基础设计

Namespace:MOMO_CHAT Table:MOMO_MSG Family:C1 Qualifier:与数据中字段名保持一致 Rowkey设计

查询需求:根据发件人id + 收件人id + 消息日期 查询聊天记录

发件人账号收件人账号时间 设计规则:业务、唯一、长度、散列、组合 设计实现

加盐方案:CRC、Hash、MD5、MUR=> 8位、16位、32位 MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间

分区设计

Rowkey前缀:MD5编码,由字母和数字构成数据并发量:高分区设计:使用HexSplit16进制划分多个分区 建表

启动Hbase:start-hbase.sh进入客户端:hbase shell #创建NS

create_namespace 'MOMO_CHAT'

#建表

create 'MOMO_CHAT:MOMO_MSG', {NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}

小结

掌握Hbase表的设计及创建表的实现

09:离线分析:Kafka消费者构建

目标:实现离线消费者的开发 路径

整体实现的路径 //入口:调用实现消费Kafka,将数据写入Hbase

public void main(){

//step1:消费Kafka

consumerKafka();

}

//用于消费Kafka数据

public void consumerKafka(){

prop = new Properties()

KafkaConsumer consumer = new KafkaConsumer(prop)

consumer.subscribe("MOMO_MSG")

ConsumerRecords records = consumer.poll

//基于每个分区来消费和处理

record :Topic、Partition、Offset、Key、Value

//step2:写入Hbase

writeToHbase(value)

//提交这个分区的offset

commitSycn(offset+1)

}

//用于将value的数据写入Hbase方法

public void writeToHbase(){

//step1:构建连接

//step2:构建Table对象

//step3:构建Put对象

//获取rowkey

rowkey = getRowkey(value)

Put put = new Put(rowkey)

put.添加每一列

table.put()

}

public String getRowkey(){

value.getSender

value.getReceiver

value.getTime

rowkey = MD5+sender+receiverId +time

return rowkey

}

实施 /**

* 用于消费Kafka的数据,将合法数据写入Hbase

*/

private static void consumerKafkaToHbase() throws Exception {

//构建配置对象

Properties props = new Properties();

//指定服务端地址

props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");

//指定消费者组的id

props.setProperty("group.id", "momo");

//关闭自动提交

props.setProperty("enable.auto.commit", "false");

//指定K和V反序列化的类型

props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//构建消费者的连接

KafkaConsumer consumer = new KafkaConsumer<>(props);

//指定订阅哪些Topic

consumer.subscribe(Arrays.asList("MOMO_MSG"));

//持续拉取数据

while (true) {

//向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间

//拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

//todo:3-处理拉取到的数据:打印

//取出每个分区的数据进行处理

Set partitions = records.partitions();//获取本次数据中所有分区

//对每个分区的数据做处理

for (TopicPartition partition : partitions) {

List> partRecords = records.records(partition);//取出这个分区的所有数据

//处理这个分区的数据

long offset = 0;

for (ConsumerRecord record : partRecords) {

//获取Topic

String topic = record.topic();

//获取分区

int part = record.partition();

//获取offset

offset = record.offset();

//获取Key

String key = record.key();

//获取Value

String value = record.value();

System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);

//将Value数据写入Hbase

if(value != null && !"".equals(value) && value.split("\001").length == 20 ){

writeToHbase(value);

}

}

//手动提交分区的commit offset

Map offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));

consumer.commitSync(offsets);

}

}

}

小结

实现离线消费者的开发

10:离线分析:Hbase连接构建

目标:实现Hbase连接的构建 实施 private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

private static Connection conn;

private static Table table;

private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名

private static byte[] family = Bytes.toBytes("C1");//列族

// 静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能

static{

try {

//构建配置对象

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");

//构建连接

conn = ConnectionFactory.createConnection(conf);

//获取表对象

table = conn.getTable(tableName);

} catch (IOException e) {

e.printStackTrace();

}

}

小结

实现Hbase连接的构建

11:离线分析:Rowkey的构建

目标:实现Rowkey的构建 实施 private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {

//转换时间戳

long time = format.parse(stime).getTime();

String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;

//构建MD5

String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);

//合并返回

return prefix+"_"+suffix;

}

小结

实现Rowkey的构建

12:离线分析:Put数据列构建

目标:实现Put数据列的构建 实施 put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));

put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));

小结

实现Put数据列的构建

13:离线分析:存储运行测试

目标:测试运行消费Kafka数据动态写入Hbase 实施

启动消费者程序 启动Flume程序 cd /export/server/flume-1.9.0-bin

bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console

启动模拟数据 java -jar /export/data/momo_init/MoMo_DataGen.jar \

/export/data/momo_init/MoMo_Data.xlsx \

/export/data/momo_data/ \

10

观察Hbase结果 小结

测试运行消费Kafka数据动态写入Hbase

14:离线分析:Hive关联测试

目标:使用Hive关联Hbase实现离线分析 路径

step1:关联step2:查询 实施

启动Hive和yarn start-yarn.sh

hive-daemon.sh metastore

hive-daemon.sh hiveserver2

start-beeline.sh

关联 create database MOMO_CHAT;

use MOMO_CHAT;

create external table if not exists MOMO_CHAT.MOMO_MSG (

id string,

msg_time string ,

sender_nickyname string ,

sender_account string ,

sender_sex string ,

sender_ip string ,

sender_os string ,

sender_phone_type string ,

sender_network string ,

sender_gps string ,

receiver_nickyname string ,

receiver_ip string ,

receiver_account string ,

receiver_os string ,

receiver_phone_type string ,

receiver_network string ,

receiver_gps string ,

receiver_sex string ,

msg_type string ,

distance string ,

message string

) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname,

C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type,

C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account,

C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex,

C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');

分析查询 --基础查询

select

msg_time,sender_nickyname,receiver_nickyname,distance

from momo_msg limit 10;

--查询聊天记录:发送人id + 接收人id + 日期:1f300e5d_13280256412_15260978785_1632888342000

select

*

from momo_msg

where sender_account='13280256412'

and receiver_account='15260978785'

and substr(msg_time,0,10) = '2021-09-29';

--统计每个小时的消息数

select

substr(msg_time,0,13) as hour,

count(*) as cnt

from momo_msg

group by substr(msg_time,0,13);

小结

使用Hive关联Hbase实现离线分析

15:离线分析:Phoenix关联测试

目标:使用Phoenix关联Hbase实现即时查询 路径

step1:关联step2:查询 实施

启动 cd /export/server/phoenix-5.0.0-HBase-2.0-bin/

bin/sqlline.py node1:2181

关联 create view if not exists MOMO_CHAT.MOMO_MSG (

"id" varchar primary key,

C1."msg_time" varchar ,

C1."sender_nickyname" varchar ,

C1."sender_account" varchar ,

C1."sender_sex" varchar ,

C1."sender_ip" varchar ,

C1."sender_os" varchar ,

C1."sender_phone_type" varchar ,

C1."sender_network" varchar ,

C1."sender_gps" varchar ,

C1."receiver_nickyname" varchar ,

C1."receiver_ip" varchar ,

C1."receiver_account" varchar ,

C1."receiver_os" varchar ,

C1."receiver_phone_type" varchar ,

C1."receiver_network" varchar ,

C1."receiver_gps" varchar ,

C1."receiver_sex" varchar ,

C1."msg_type" varchar ,

C1."distance" varchar ,

C1."message" varchar

);

即时查询 --基础查询

select

"id",c1."sender_account",c1."receiver_account"

from momo_chat.momo_msg

limit 10;

--查询每个发送人发送的消息数

select

c1."sender_account" ,

count(*) as cnt

from momo_chat.momo_msg

group by c1."sender_account";

--查询每个发送人聊天的人数

select

c1."sender_account" ,

count(distinct c1."receiver_account") as cnt

from momo_chat.momo_msg

group by c1."sender_account"

order by cnt desc;

小结

使用Phoenix关联Hbase实现即时查询

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: