离线数仓-2-数据采集

离线数仓-2-数据采集1.用户行为日志数据模拟1.用户行为日志的介绍2.埋点有哪些3.用户行为日志内容4.用户行为日志格式5.用户行为日志数据采集1.节点之间配置免密登录2.linux环境变量说明3.用户行为日志模拟脚本4.Hadoop的搭建5.Hadoop在项目中的优化6.Zookeeper的安装7.kafka的安装8.Flume的安装

4.数仓同步-实时数仓同步、离线数仓同步5.业务数据采集和同步1.电商业务流程2.电商常识3.电商系统表结构1.电商业务表关联关系2.后台管理系统

4.业务数据通道5.实时同步业务数据:Maxwell实时同步Mysql业务数据6.离线同步-用户行为数据:Flume7.离线同步-业务数据:DataX1.离线数仓数据的同步策略:全量同步和增量同步2.DataX相关配置3.全量表数据同步至HDFS:DataX4.增量表数据同步至HDFS:flume5.增量表数据首次全量同步至HDFS:Maxwell

离线数仓-2-数据采集

下面的笔记是记录了实现红框中标注的数据采集功能。

1.用户行为日志数据模拟

1.用户行为日志的介绍

用户行为日志的内容,主要包括用户的各项行为信息以及行为所处的环境信息。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点。

2.埋点有哪些

主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点等 - 代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。 - 可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。 - 全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。

3.用户行为日志内容

页面浏览记录 动作记录 曝光记录 启动记录 错误记录

4.用户行为日志格式

日志结构大致可分为两类,一是页面日志,二是启动日志。

页面日志:页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。启动日志:启动日志以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。 阿里云服务器的购买和使用相关教程

5.用户行为日志数据采集

服务器环境的搭建,采集主要流程是使用Flume进行的,Flume监控文件数据,将数据落地到kafka中,实现用户行为日志数据采集。

1.节点之间配置免密登录

2.linux环境变量说明

不管是login shell还是non-login shell,启动时都会加载/etc/profile.d/*.sh中的环境变量,所以我们配置环境变量的时候,一般都写到此文件夹下面,并以.sh结尾。

3.用户行为日志模拟脚本

4.Hadoop的搭建

5.Hadoop在项目中的优化

1.HDFS存储多目录,在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题,若服务器有多个磁盘,必须对该参数进行修改,注意:每台服务器挂载的磁盘不一样,所以每个节点的多目录配置可以不一致。单独配置即可。

dfs.datanode.data.dir

file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4

2.集群数据均衡

①节点间数据均衡 开启数据均衡命令: start-balancer.sh -threshold 10 对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。 停止数据均衡命令: stop-balancer.sh②磁盘间数据均衡 生成均衡计划(我们只有一块磁盘,不会生成计划) hdfs diskbalancer -plan hadoop103 执行均衡计划 hdfs diskbalancer -execute hadoop103.plan.json 查看当前均衡任务的执行情况 hdfs diskbalancer -query hadoop103 取消均衡任务 hdfs diskbalancer -cancel hadoop103.plan.json 3.Hadoop参数调优 ①.HDFS参数调优hdfs-site.xml The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes. NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。 对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。

dfs.namenode.handler.count

10

dfs.namenode.handler.count=20*log e Cluster Size,比如集群规模为8台时,此参数设置为41。可通过简单的python代码计算该值,代码如下。

[yang@hadoop102 ~]$ python

Python 2.7.5 (default, Apr 11 2018, 07:36:10)

[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2

Type "help", "copyright", "credits" or "license" for more information.

>>> import math

>>> print int(20*math.log(8))

41

>>> quit()

②.YARN参数调优yarn-site.xml 情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive 面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。 解决办法: 内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。 (a)yarn.nodemanager.resource.memory-mb 表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。 (b)yarn.scheduler.maximum-allocation-mb 单个任务可申请的最多物理内存量,默认是8192(MB)。

6.Zookeeper的安装

7.kafka的安装

8.Flume的安装

1.Flume的安装 2.Flume的KafkaChannel 使用下面的数据传输路线规划: 3.Flume的配置

相关配置参考官网:https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html1.KafkaChannels的具体配置如下:flie_to_kafka.conf2.配置Flume的拦截器,书写java代码,打成jar包,上传到lib目录下面,在配置文件中添加拦截器配置,判断数据是否为完整的json

flie_to_kafka.conf配置如下:

#定义组件

a1.sources = r1

a1.channels = c1

#配置sources

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*

a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.yang.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channels

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092

a1.channels.c1.kafka.topic = topic_log

a1.channels.c1.parseAsFlumeEvent = false

#组装

a1.sources.r1.channels = c1

4.数仓同步-实时数仓同步、离线数仓同步

5.业务数据采集和同步

1.电商业务流程

2.电商常识

SKU = Stock Keeping Unit(库存量基本单位)。现在已经被引申为产品统一编号的简称,每种产品均对应有唯一的SKU号。SPU(Standard Product Unit):是商品信息聚合的最小单位,是一组可复用、易检索的标准化信息集合。平台属性 销售属性

3.电商系统表结构

1.电商业务表关联关系

2.后台管理系统

4.业务数据通道

5.实时同步业务数据:Maxwell实时同步Mysql业务数据

mysql开启binlog日志,row模式的,maxwell监控binlog,然后将增量日志实时写入到kafka中。

mysql主从备份,只有主节点对外提供服务 mysql读写分离,两台节点都对外提供服务 maxwell是模仿了一个mysql的从节点,所做的事情跟mysql从节点一致。

6.离线同步-用户行为数据:Flume

使用flume进行离线同步用户行为数据,flume的设计模式如下:

source端使用kafka sourcechannel端使用file channelsink端使用hdfs sink flume相关配置文件:kafka_to_hdfs_log.conf

#定义组件

a1.sources=r1

a1.channels=c1

a1.sinks=k1

#配置source1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.batchSize = 5000

a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sources.r1.kafka.topics=topic_log

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.yang.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1

a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1

a1.channels.c1.maxFileSize = 2146435071

a1.channels.c1.capacity = 1000000

a1.channels.c1.keep-alive = 6

#配置sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d

a1.sinks.k1.hdfs.filePrefix = log

a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

#组装

a1.sources.r1.channels = c1

hdfs中小文件的危害:

1.计算MR任务的时候,每个文件会生成一个maptask,每个maptask占用一定量内存,如果小文件过多,则会占用大量内存,消耗集群资源。2.DataNode每个文件在NameNode中都会有一个150B的文件大小等元数据信息,如果小文件过多,则会占用大量的NameNode的空间,影响NameNode的使用寿命。3.解决flume生成小文件同步到HDFS的问题: 在flume配置文件中添加下面三个参数: a1.sinks.k1.hdfs.rollInterval = 10 :时间控制,单位:s,需要基于具体实际来分析,多久会生成128M左右的block块 a1.sinks.k1.hdfs.rollSize = 134217728 : 文件大小控制,文件达到设置的大小后,生成一个gz文件,否则是tmp文件 a1.sinks.k1.hdfs.rollCount = 0 :生产环境中设置为0,文件条数不好来控制文件大小,因为生产中,每条文件大小不一致,这里不设置的话,就默认此参数不生效,使用前两个即可。 Flume中零点漂移问题的处理:

数据漂移问题产生的原因:flume的event中,header使用的是事件的处理时间,也就是数据落地到kafka中的时间。 如何解决零点漂移问题:在flume中增加时间拦截器,header中的时间使用数据发生时间,而不是数据处理时间即可解决零点漂移问题。

7.离线同步-业务数据:DataX

1.离线数仓数据的同步策略:全量同步和增量同步

1.全量同步,就是每天都将业务数据库中的全部数据同步一份到数据仓库,这是保证两侧数据同步的最简单的方式。 2.增量同步,就是每天只将业务数据中的新增及变化数据同步到数据仓库。采用每日增量同步的表,通常需要在首日先进行一次全量同步。 3.增量同步和全量同步的优缺点:

同步策略优点缺点全量同步逻辑简单在某些情况下效率比较低,例如某张表数据量较大,但是每天数据的变化比例很低,若对其采用每日全量同步,则会重复同步和存储大量相同的数据。增量同步效率高,无需同步和存储重复数据逻辑复杂,需要将每日的新增及变化数据同原来的数据进行整合,才能对外使用

4.总结:

大表 字段变化多:全量同步大表 字段变化少:增量同步小表 字段变化多:全量同步小表 字段变化少:全量同步维度表 全量同步,事实表 增量同步,有的表既需要全量又需要增量。 5.对应同步工具

全量:DataX、Sqoop增量:Maxwell、Canal相关比较如下:

增量同步方案DataX/SqoopMaxwell/Canal对数据库的要求原理是基于查询,故若想通过select查询获取新增及变化数据,就要求数据表中存在create_time、update_time等字段,然后根据这些字段获取变更数据。要求数据库记录变更操作,例如MySQL需开启binlog。数据的中间状态由于是离线批量同步,故若一条数据在一天中变化多次,该方案只能获取最后一个状态,中间状态无法获取。由于是实时获取所有的数据变更操作,所以可以获取变更数据的所有中间状态。

2.DataX相关配置

DataX本身作为离线数据同步框架,采用Framework+plugin架构构建。将数据源读取和写入抽象成Reader/Writer插件,纳入到整个同步框架中。 DataX运行流程 DataX使用

Reader和Writer的具体参数可参考官方文档,地址如下: https://github.com/alibaba/DataX/blob/master/README.md hadoop读取gz格式的文件:hadoop fs -cat /data/a.gz |zcat HDFS数据写出到Mysql相关DataX配置 mysql数据写出到HDFS相关DataX配置如下

{

"job": {

"content": [

{

"reader": {

"name": "mysqlreader",

"parameter": {

"column": [

"id",

"name",

"region_id",

"area_code",

"iso_code",

"iso_3166_2"

],

"where": "id>=3",

"connection": [

{

"jdbcUrl": [

"jdbc:mysql://hadoop102:3306/gmall"

],

"table": [

"base_province"

]

}

],

"password": "000000",

"splitPk": "",

"username": "root"

}

},

"writer": {

"name": "hdfswriter",

"parameter": {

"column": [

{

"name": "id",

"type": "bigint"

},

{

"name": "name",

"type": "string"

},

{

"name": "region_id",

"type": "string"

},

{

"name": "area_code",

"type": "string"

},

{

"name": "iso_code",

"type": "string"

},

{

"name": "iso_3166_2",

"type": "string"

}

],

"compress": "gzip",

"defaultFS": "hdfs://hadoop103:8020",

"fieldDelimiter": "\t",

"fileName": "base_province",

"fileType": "text",

"path": "/base_province",

"writeMode": "append"

}

}

}

],

"setting": {

"speed": {

"channel": 1

}

}

}

}

3.全量表数据同步至HDFS:DataX

全量表数据流图 全量数据同步使用DataX进行,书写相关的DataX脚本,来自动化运行同步mysql数据到HDFS中。

# ecoding=utf-8

import json

import getopt

import os

import sys

import MySQLdb

#MySQL相关配置,需根据实际情况作出修改

mysql_host = "hadoop102"

mysql_port = "3306"

mysql_user = "root"

mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改

hdfs_nn_host = "hadoop103"

hdfs_nn_port = "8020"

#生成配置文件的目标路径,可根据实际情况作出修改

output_path = "/opt/module/datax/job/import"

def get_connection():

return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

def get_mysql_meta(database, table):

connection = get_connection()

cursor = connection.cursor()

sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"

cursor.execute(sql, [database, table])

fetchall = cursor.fetchall()

cursor.close()

connection.close()

return fetchall

def get_mysql_columns(database, table):

return map(lambda x: x[0], get_mysql_meta(database, table))

def get_hive_columns(database, table):

def type_mapping(mysql_type):

mappings = {

"bigint": "bigint",

"int": "bigint",

"smallint": "bigint",

"tinyint": "bigint",

"decimal": "string",

"double": "double",

"float": "float",

"binary": "string",

"char": "string",

"varchar": "string",

"datetime": "string",

"time": "string",

"timestamp": "string",

"date": "string",

"text": "string"

}

return mappings[mysql_type]

meta = get_mysql_meta(database, table)

return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)

def generate_json(source_database, source_table):

job = {

"job": {

"setting": {

"speed": {

"channel": 3

},

"errorLimit": {

"record": 0,

"percentage": 0.02

}

},

"content": [{

"reader": {

"name": "mysqlreader",

"parameter": {

"username": mysql_user,

"password": mysql_passwd,

"column": get_mysql_columns(source_database, source_table),

"splitPk": "",

"connection": [{

"table": [source_table],

"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]

}]

}

},

"writer": {

"name": "hdfswriter",

"parameter": {

"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,

"fileType": "text",

"path": "${targetdir}",

"fileName": source_table,

"column": get_hive_columns(source_database, source_table),

"writeMode": "append",

"fieldDelimiter": "\t",

"compress": "gzip"

}

}

}]

}

}

if not os.path.exists(output_path):

os.makedirs(output_path)

with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:

json.dump(job, f)

def main(args):

source_database = ""

source_table = ""

options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])

for opt_name, opt_value in options:

if opt_name in ('-d', '--sourcedb'):

source_database = opt_value

if opt_name in ('-t', '--sourcetbl'):

source_table = opt_value

generate_json(source_database, source_table)

if __name__ == '__main__':

main(sys.argv[1:])

4.增量表数据同步至HDFS:flume

将kafka中的topic为topic_db里面的增量数据说那个flume写出到HDFS中。

书写flume脚本,将kafka数据同步到hdfs中。书写拦截器,解决数据漂移问题和以表名命名目录名。

5.增量表数据首次全量同步至HDFS:Maxwell

此场景仅仅适用于增量表首日全量同步,如果是常态化全量同步还是建议使用DataX工具。

书写首日全量同步脚本:

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {

$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties

}

case $1 in

"cart_info")

import_data cart_info

;;

"comment_info")

import_data comment_info

;;

"coupon_use")

import_data coupon_use

;;

"favor_info")

import_data favor_info

;;

"order_detail")

import_data order_detail

;;

"order_detail_activity")

import_data order_detail_activity

;;

"order_detail_coupon")

import_data order_detail_coupon

;;

"order_info")

import_data order_info

;;

"order_refund_info")

import_data order_refund_info

;;

"order_status_log")

import_data order_status_log

;;

"payment_info")

import_data payment_info

;;

"refund_payment")

import_data refund_payment

;;

"user_info")

import_data user_info

;;

"all")

import_data cart_info

import_data comment_info

import_data coupon_use

import_data favor_info

import_data order_detail

import_data order_detail_activity

import_data order_detail_coupon

import_data order_info

import_data order_refund_info

import_data order_status_log

import_data payment_info

import_data refund_payment

import_data user_info

;;

esac

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: