数仓

olap vs oltp

基本含义不同:OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,记录即时的增、删、改、查,比如在银行存取一笔款,就是一个事务交易。OLAP即联机分析处理,是数据仓库的核心部心,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。典型的应用就是复杂的动态报表系统。实时性要求不同:OLTP实时性要求高,OLTP 数据库旨在使事务应用程序仅写入所需的数据,以便尽快处理单个事务。OLAP的实时性要求不是很高,很多应用顶多是每天更新一下数据。数据量不同:OLTP数据量不是很大,一般只读/写数十条记录,处理简单的事务。OLAP数据量大,因为OLAP支持的是动态查询,所以用户也许要通过将很多数据的统计后才能得到想要知道的信息,例如时间序列分析等等,所以处理的数据量很大。用户和系统的面向性不同:OLTP是面向顾客的,用于事务和查询处理。OLAP是面向市场的,用于数据分析。(olap引擎:Hive,Spark SQL,Presto,Greeplum)数据库设计不同:OLTP采用实体-联系ER模型和面向应用的数据库设计。OLAP采用星型或雪花模型和面向主题的数据库设计。 数据仓库是面向主题集成的,同时数据仓库通过表结构优化,存储方法优化等提高查询速度,降低开销。

将OLTP数据库的数据转移到OLAP数据库的过程一般包括以下几个步骤:

数据抽取:从OLTP数据库中提取需要分析的数据,可以使用SQL语句、ETL工具或者其他方法。数据清洗:对抽取出来的数据进行质量检查和修正,去除重复、错误或者不一致的数据,保证数据的完整性和有效性。数据转换:根据OLAP数据库的结构和需求,对数据进行适当的变换和加工,如进行聚合、分组、排序等操作,生成事实表和维度表。数据加载:将转换后的数据加载到OLAP数据库中,可以使用批量加载、增量加载或者实时加载等方式。数据刷新:定期或者根据事件触发,对OLAP数据库中的数据进行更新和同步,保证数据的时效性和一致性。

数据库三范式

第一范式(字段不能重复且不能分解)

第一范式是最基本的范式。如果数据库表中的所有字段值都是不可分解的原子值,就说明该数据库表满足了第一范式。

第二范式(增加主键)

主键可以是一列或者多列组成的,根据主键,马上可以精确到特点的一行数据

第三范式(消除非主键的传递关系)

例如表中主键是商品编号(100),但是商品类别名称和商品类别描述可以根据商品类别编号(3)字段去检索,这样商品类别名称(100/3)和商品类别(100/3)描述编号会有冗余。

数据仓库 vs 数据中台

数据中台是为了帮助企业打通数据孤岛,整合数据资源,实现数据和业务的联系,从而为业务提供更好的数据支持。数据仓库是通过数据挖掘和分析,帮助企业制定更好的业务策略和决策方案。

数据仓库意义

数据仓库为了解决企业数据集成和分析的问题,为企业割部门提供统一的,规范的数据出口。

数据仓库特点

面向主题的:数据按照一定的主题域进行组织的,每一个主题对应一个宏观的分析领域集成的:从各个分散的数据库种抽取数据,并在集成的过程中进行数据清洗,规划和去敏稳定性:数据仓库主要为了做分析使用,所以不存在想oltp种数据库的更新和删除变化性:数据仓库随着时间变化是不断增强新的数据内容且不断删除久的数据内容的

数仓分层的意义

隔离原始数据数据结构化更清晰:每一个数据分层都有它的作用域和职责数据血缘追踪:清晰的表现每张表的作用范围,可以快速准确定位到问题增强数据复用能力,简化复杂问题:复用中间层数据,分层后便于维护统一数据口径

数仓分层

ODS-源数据层:是数据仓库的基础层,它负责从各种数据源抽取、清洗、转换和加载原始数据,保证数据的完整性、准确性和一致性。DW-数据仓库层是数据仓库的核心层,它负责对源数据层的数据进行进一步的加工、聚合和建模,使得数据更加适合分析和查询。模型层通常采用维度建模,将数据划分为事实表和维度表,事实表存储业务过程中发生的事件,维度表存储事件的属性。DWD-数据明细层 对ODS层数据进行清洗,维度退化和脱敏。维度退化是指当一个维度没必要用一张维度表来存储的时候,讲维度退化到事实表种,例如订单id。公共维度层(DIM):基于维度建模理论进行构建,存放维度模型中的维度表,保存一致性维度信息。DWS-数据服务层粒度变粗,对度量值进行汇总,大多都是宽表。ADS-指标层是数据仓库的最上层,负责对模型层的数据进行最终的应用和呈现,提供给用户或系统使用。

数仓建模方法

数仓建模的目标是通过数仓建模更好的组织,存储数据,以便在性能,成本,效率和数据质量之间找到最佳的平衡点。

易于理解和查询提高更高的数据粒度:维度建模允许在事实表种存储详细数据,在维度表种存储域数据相关的上下文信息支持多维度分析:维度可以根据业务需求进行组合和切片提高数据一致性和准确性:一些事实表需要用到共享信息,例如日期,时间,地理位置支持快速数据查询和报告生成

范式建模:基于关系型数据库的建模方法,核心思想是消除冗余数据,将数据按照三范式规范化的方式进行存储,面向事务,适合长期稳定的业务。但是查询时关联表较多使得查询性能降低。维度建模:包含维度和指标。维度建模面向分析,最终提高查询性能,增加数据冗余,违反三范式,适合业务经常变化。星型模型:星型模型是由一个事实表和多个维度表组成的,每个维度表都直接与事实表相连接,形成一个类似星星的结构。星型模型的优点是简单直观,易于理解和使用,查询性能高,适合快速交付和变化频繁的业务需求。星型模型的缺点是数据冗余度高,可能导致数据不一致和存储空间浪费,维度表的层次结构不明显,不利于分析细节。雪花模型:雪花模型是对星型模型的扩展,它将一些维度表进一步分解为更小的维度表,形成一个类似雪花的结构。雪花模型的优点是数据冗余度低,数据一致性和质量高,维度表的层次结构清晰,有利于分析复杂的业务逻辑。雪花模型的缺点是设计和实现较复杂,查询性能低,需要多次连接操作,不利于快速响应和变更。星座模型:星座模型中存在多张事实表,不同事实表之间共享维表信息,常用于数据关系更复杂的场景。

数据仓库主体域

主题域是面向业务过程,将业务活动事件进行抽象的集合,将数据主题划分到不同的主题域。(DWD) 数据域是面向业务分析:将业务过程或者维度进行抽象(DWS)

所属系统划分按照业务或者业务过程划分:广告域,客户域按照部分划分:人力,财务,销售,运营按照行业案例:FS-LDM金融主题模型

维度表 VS 事实表

事实表:表示我们要关注的内容,表格里存储了能体现实际数据或详细数值,事实表用来存储事实的度量及指向各个维的外键值。维度表:维度表用于描述事实表种的业务维度,如时间,地理位置,产品,客户。维表用来保存该维的元数据,维的描述信息,维的层次以及成员类别。中心的SalesOrder(销售流水表)为事实表,保存的是下单这个业务过程的所有记录。位于周围的每张表都是维度表,包括Customer(客户表)、Date(日期表)、Location(地址表)和Product(商品表)等,这些维度表组成了每个订单发生时所处的环境,即何人、何时、在何地购买了何种商品。 具体例子(保险)

主题域 :保单分析,理赔分析,风险分析,客户分析 保单分析:这个主题域关注的是保单的销售、续保、退保等情况,以及与之相关的客户、产品、渠道等属性。可能的度量有保费收入、续保率、退保率等,可能的维度有保单日期、保单状态、产品类型、产品名称、客户类型、客户地区、渠道类型、渠道名称等。 理赔分析:这个主题域关注的是理赔的申请、审核、支付等情况,以及与之相关的客户、产品、赔案等属性。可能的度量有理赔支出、赔付率、理赔周期等,可能的维度有理赔日期、理赔状态、产品类型、产品名称、产品代码、客户类型、客户地区、赔案类型、赔案原因等。 原子指标:原子指标是基于某一业务事件或行为下的度量值,是业务定义中不可再拆分的指标,具有明确的业务含义和统计口径。例如,理赔申请数、理赔审核数、理赔支付数、理赔金额、理赔成本等,都是原子指标。 派生指标:派生指标是基于一个或多个原子指标,通过加减乘除或其他逻辑运算得到的指标,通常用于反映业务绩效或效率。例如,赔付率、理赔周期、理赔频次、理赔成本率等,都是派生指标。 赔付率:某一时间段内,保险公司向客户支付的理赔金额占该时间段内承保责任金额的比例。计算公式为: 理赔成本率:某一时间段内,保险公司为处理客户的理赔申请而发生的成本占该时间段内承保责任金额的比例。

Hadoop组件

Mapreduce

mapreduce详细流程

maptask

Map Task共分为5个阶段,分别是:

Read阶段:Map Task通过用户编写的RecordReader从输入InputSplit中解析处一个个的Key/Value。

Map阶段:该阶段主要将解析出来的Key/Value交给用户编写的map()函数处理,并产生新的key/value。

Collect阶段:在用户编写的map()函数中,数据处理完时一般调用OutputCollector.collect()输出结果,在该函数内部,它首先调用Partitioner计算该key/value所属的partition,并将key/value写入到环形缓冲区中。

Spill阶段:即“溢写”阶段,当环形缓冲区比较满后,MapTask会将缓冲区中的数据写到本地磁盘的临时文件中,需要注意的是,在写入文件之前,MapTask会首先对数据进行一次排序,如果用户指定了压缩和combiner函数,那么还会对数据进行压缩和合并。

Merge阶段:当一个map task处理的数据很大,以至于超过缓冲区内存时,就会生成多个spill文件。此时就需要对同一个map任务产生的多个spill文件进行归并生成最终的一个已分区且已排序的大文件。配置属性mapreduce.task.io.sort.factor控制着一次最多能合并多少流,默认值是10。这个过程包括排序和合并(可选),归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对(根据上面提到的combine的调用时机可知)。

溢出写文件归并完毕后,Map将删除所有的临时溢出写文件,并告知NodeManager任务已完成,只要其中一个MapTask完成,ReduceTask就开始复制它的输出(Copy阶段分区输出文件通过http的方式提供给reducer)

reducetask

Shuffle阶段:也称为Copy阶段。Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

Sort阶段:按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现对自己的处理结果进行了局部排序,因此,Reduce Task只需对所有数据进行一次归并排序即可。

Reduce阶段:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。

Write阶段:reduce()函数将计算结果写到HDFS上。

归约迭代器

sort阶段的排序是通过一个叫做归约迭代器(Reduce Iterator)的机制实现的。归约迭代器是一个特殊的迭代器,它可以从merge阶段产生的有序数据流中读取数据,并且按照key的相等性进行分组。每当归约迭代器遇到一个新的key,它就会调用一次reduce函数,并且将该key对应的所有value作为输入参数。这样,归约迭代器就可以在一次遍历中完成数据的分组和归约,而不需要再进行额外的排序。

mapreduce过程中的三次排序

在MapReduce的shuffle过程中通常会执行三次排序,分别是:

Map输出阶段(溢写阶段):根据分区以及key进行快速排序Map的合并溢写文件:将同一个分区的多个溢写文件进行归并排序,合成大的溢写文件Reduce输入阶段:将同一分区,来自不同Map task的数据文件进行归并排序

combine和merge不同

环形缓冲区的阈值

map和reduce的数量

map数量 map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,正常的map数量的并行规模大致是每一个Node是10~100个,对于CPU消耗较小的作业可以设置Map数量为300个左右,但是由于hadoop的每一个任务在初始化时需要一定的时间,因此比较合理的情况是每个map执行的时间至少超过1分钟。具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理,当然用户还是可以通过参数mapred.min.split.size参数在作业提交客户端进行自定义设置。还有一个重要参数就是mapred.map.tasks,这个参数设置的map数量仅仅是一个提示,只有当InputFormat 决定了map任务的个数比mapred.map.tasks值小时才起作用。同样,Map任务的个数也能通过使用JobConf 的conf.setNumMapTasks(intnum)方法来手动地设置。这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。当然为了提高集群的并发效率,可以设置一个默认的map数量,当用户的map数量较小或者比本身自动分割的值还小时可以使用一个相对交大的默认值,从而提高整体hadoop集群的效率。reduc数量 reduce在运行时往往需要从相关map端复制数据到reduce节点来处理,因此相比于map任务。reduce节点资源是相对比较缺少的,同时相对运行较慢,正确的reduce任务的个数应该是0.95或者1.75*(节点数×mapred.tasktracker.tasks.maximum参数值)。如果任务数是节点个数的0.95倍,那么所有的reduce任务能够在map任务的输出传输结束后同时开始运行。如果任务数是节点个数的1.75倍,那么高速的节点会在完成他们第一批reduce任务计算之后开始计算第二批reduce任务,这样的情况更有利于负载均衡。同时需要注意增加reduce的数量虽然会增加系统的资源开销,但是可以改善负载匀衡,降低任务失败带来的负面影响。同样,Reduce任务也能够与map任务一样,通过设定JobConf 的conf.setNumReduceTasks(int num)方法来增加任务个数。

map join vs reduce join

map join:在提交作业的时候先将小表放在该作业的DistributedCache中比如放在Hashmap等容器中,然后join的时候取出,然后扫描大表,看大表中每条记录的join key/value值是否能在在内存中找到相同的join key的记录,如果有则直接输出结果。reduce join:在map阶段,map函数同时读取两个文件,然后为了区分两个文件来源的key/value分别打上标签。然后在reduce阶段,reduce函数获取key相同来自两个文件的value list,然后对两个文件的数据进行join。(笛卡尔乘积)

HBASE工作流程

写入流程

1、Client访问zookeeper,获取元数据存储所在的regionserver

2、拿到对应的表存储的regionserver,通过刚刚获取的地址访问对应的regionserver,

3、去表所在的regionserver进行数据的添加

4、查找对应的region,在region中寻找列族,把数据分别写到Hlog和memstore各一份

5、当memstore写入的值变多,触发溢写操作(flush),进行文件的溢写,成为一个StoreFile

6、当溢写的文件过多时,会触发文件的合并(Compact)操作。合并有两种方式(major,minor)多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除

minor compaction:小范围合并,默认是3-10个文件进行合并,不会删除其他版本的数据。

major compaction:将当前目录下的所有文件全部合并,一般手动触发,会删除其他版本的数据(不同时间戳的)

7、当region中的数据逐渐变大之后,达到某一个阈值,会进行裂变(一个region等分为两个region,并分配到不同的regionserver),原本的Region会下线,新Split出来的两个Region会被HMaster分配到相应HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。

读过程

1.Client访问zookeeper,获取元数据存储所在的regionserver

2.通过刚刚获取的地址访问对应的regionserver,拿到对应的表存储的regionserver

3.去表所在的regionserver进行数据的读取

4.查找对应的region,在region中寻找列族,先找到memstore,找不到去blockcache中寻找,再找不到就进行storefile的遍历

5.找到数据之后会先缓存到blockcache中,再将结果返回.blockcache逐渐满了之后,会采用LRU的淘汰策略。

补充

在HBase中,BlockCache是一种读缓存。HBase会将一次文件查找的Block块缓存到Cache中,以便后续同一请求或者邻近数据查找请求直接从内存中获取,避免昂贵的IO操作。如果需要缓存的数据超过堆大小的情况下,推荐使用Block Cache下的off-heap。Off-heap是指堆外内存,不由GC管理,但可以通过full GC回收,通过-XX:MaxDirectMemorySize设置大小。

当scan获取数据时,可以通过setCacheBlocks方法来设置是否使用block cache,对于频繁访问的行才建议使用block cache。

对于MapReduce的Scan作为输入任务,应该设置为setCacheBlocks(false)以避免在MapReduce期间使用BlockCache。这是因为BlockCache是HBase的读缓存,保存着最近被访问的数据块。在MapReduce期间,由于大量的数据读取,BlockCache会被填满,导致缓存失效,从而导致性能下降。

.HStore存储是HBase存储的核心,其中由两部分组成,一部分是Memstore,一部分是StoreFile。

.HLog的的功能: 宕机数据恢复

在分布式系统环境中,我们是无法避免系统出错或者宕机的,一旦HRegionServer意外退出,MemStore中的内存数据就会丢失,而引入HLog就是为了防止这种情况。

工作机制:每个HRegionServer中都会有一个HLog对象,HLog是一个实现Write Ahead Log的类,每次用户操作写入Memstore的同时,也会写一份数据到HLog文件中,HLog文件定期会滚动出新,并删除旧的文件(已持久化到Storefile中的数据),当HRegionServer意外终止后,HMaster会通过Zookeeper感知,HMaster首先处理遗留的HLog文件,将不同region的log数据拆分,分别放在相应region目录下,然后再将失效的region(带有刚刚拆分的log)重新分配,领取到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到Memstore中,然后flush到StoreFile,完成数据恢复。

.Region就是StoreFiles,StoreFiles里由HFile构成,HFile里由hbase的data块构成,一个data块里面又有很多的keyvalue对,每个keyvalue里存了我们需要的值。

HDFS工作流程

namenode(管理节点):namenode管理文件系统的命名空间。它维护着文件系统树及整棵树内的所有文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。namenode也记录着每个文件中各个块所在的数据节点信息,但是它并不会永久保存块的位置信息,因为这些信息会在系统启动时根据数据节点信息重建。datanode:datanode是文件系统的工作节点。它们根据需要存储并检检索数据块(受客户端或namenode调度),并且定期向namenode发送它们所存储的块的列表。

读取流程

客户端调用 FileSystem 对象的open()方法,其实获取的是一个分布式文件系统(DistributedFileSystem)实例;将所要读取文件的请求发送给 NameNode,然后 NameNode 返回文件数据块所在的 DataNode 列表(是按照 Client 距离 DataNode 网络拓扑的远近进行排序的),同时也会返回一个文件系统数据输入流(FSDataInputStream)对象;客户端调用 read() 方法,会找出最近的 DataNode 并连接;数据从 DataNode 源源不断地流向客户端。

写入流程

客户端通过调用分布式文件系统(DistributedFileSystem)的create()方法创建新文件;DistributedFileSystem 将文件写入请求发送给 NameNode,此时 NameNode 会做各种校验,比如文件是否存在,客户端有无权限去创建等;如果校验不通过则会抛出I/O异常。如果校验通过,NameNode 会将该操作写入到编辑日志中,并返回一个可写入的 DataNode 列表,同时,也会返回文件系统数据输出流(FSDataOutputStream)的对象;客户端在收到可写入列表之后,会调用 write() 方法将文件切分为固定大小的数据包,并排成数据队列;数据队列中的数据包会写入到第一个 DataNode,然后第一个 DataNode 会将数据包发送给第二个 DataNode,依此类推。DataNode 收到数据后会返回确认信息,等收到所有 DataNode 的确认信息之后,写入操作完成。

hdfs小文件危害和解决办法

小文件来源和危害:

源数据本身有很多小文件;动态分区会产生大量小文件;reduce个数越多,小文件越多;按分区插入数据的时候会产生大量的小文件。 从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。HDFS存储大多小文件,会导致namenode元数据特别大,占用太多内存,制约了集群的扩展。

解决方法:

合并小文件: 使用Hadoop的MapReduce或Hive自带的输入格式(TextFile、 SequenceFile等)来合并小文件,将它们合并成一个或几个更大的文件。 压缩文件: 使用Hadoop的压缩算法(如gzip、snappy、Izo等)来压缩小文件,减小文件大小,提高I/O效率。 调整输入格式: 使用Hive的输入格式 (如ORC、Parquet等) 代替TextFile、 SequenceFile等格式,可以提高查询性能,同时可以减少小文件的数量。 使用分区:将数据根据一些共同属性(如日期、地区等)进行分区,可以将小文件转换成更大的块,提高性能。 合理设置参数: 调整MapReduce任务的参数 (如mapreduceinputfileinputformat.splitminsize等)可以让任务更适合处理小文件,提高效率。 使用动态资源分配: 可以通过设置Spark参数 (如spark.dynamicAllocation.enabled) 来使用动态资源分配,从而优化Spark集群资源的使用。

Yarn工作流程

1.用户向YARN中提交应用程序,其中包括ApplicationMaster程序,启动ApplicatioMaster的命令、用户程序

2.ResourceManager为该应用程序分配第一个container,并与对应的NodeManager通信,要求它在这个container中启动应用程序的ApplicationMaster

3.ApplicationMaster首先向ResourceManager注册。

4.ApplicationMaster采用轮询的方式通过RPC协议来向ResourceManager申请和领取资源

5.一旦ApplicationMaster申请到资源后,便于对应的NodeManager通信

6.NodeManager为任务设置好运行环境(包括环境变量,JAR包,二进制程序后,将任务启动命令写到一个脚本中)

7.各个任务通过RPC协议向ApplicationMaster向ResourceManager汇报自己的进度

ResourceManager(RM)

RM是一个全局的资源管理器,负责整个系统的资源管理的分配和管理。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Application Manager,ASM)

调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等) 将系统的资源分配给各个正在进行的应用程序。需要注意的是,该调度器是一个纯调度器,它不再从事任何与具体应用程序相关的工作。这些都交给ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”表示。

ApplicationMaster(AM)

用户提交的每个应用程序均包含一个AM,主要功能:

RM调度器协商以获取资源(用container表示)将得到的任务进一步分配给内部的任务与NM通信以启动、停止任务监控所有任务的状态,并在任务失败的时候重新为任务申请资源并重启任务

当前YARN自带了两个AM实现,一个用于演示AM编写方法的实例程序,它可以申请一定数目的Container以并行运行一个Shell命令或者Shell脚本。另一个是运行MapReduce应用程序的AM— MRAppMaster

NodeManager

Nm是每个节点上的资源和任务管理器,一方面,他会定时的向RM汇报本节点上的资源使用情况和各个Container的运行状态。另一方面,它接受并处理来自AM的Container启动、停止请求。

Container

Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。当AM向RM申请资源时。RM为AM返回的资源是用Container表示的。YARN会表示为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。需要注意的是,Container不同于MRv1中的solt,它是一个动态资源划分单位,是根据应用程序动态生成的,YARN只支持CPU和内存两种资源,使用Cgroup进行隔离

Spark工作流程

Application:应用程序

构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请TaskTask Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给ExecutorTask在Executor上运行,运行完毕释放所有资源

SPARK SQL和HIVE SQL对比

Spark SQLHive查询语言Spark SQL和HQLHQL执行引擎Spark默认Mapreduce,可以自定义为Spark数据存储本身不提供数据存储,可指定到不同存储系统,如HDFS,hive,Hbase,MysqlHDFS元数据存储可选元数据存储必须指定元数据存储APIDataFrame/DateSet DSL 和SQLHQLSchemaschema自动关联使用DDL显示表明schema

对比方面\计算引擎sparkmapreduce备注计算方式内存计算IO读写迭代计算过程中,MR需要不断IO,而Spark引入了RDD和DAG,使计算过程基于内存完成,提升了处理性能任务调度task为线程级别task为进程级别Spark可以通过复用线程池中的线程减少启动,关闭task所需要的消耗。执行策略Spark在shuffle时只有部分场景需要排序,支持基于hash的分布式聚合,更加省时Mapreduce在shuffle前需要花费大量时间进行排序

Spark,hive on spark,spark on hive三者比较

Hive引擎包括:默认MR、tez、spark

Hive on Spark:Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。

Spark on Hive : Hive只作为存储元数据,Spark负责SQL解析优化,语法是Spark SQL语法,Spark负责采用RDD执行。

【spark on hive 】

hive只作为存储角色,spark 负责sql解析优化,底层运行的还是sparkRDD

具体可以理解为spark通过sparkSQL使用hive语句操作hive表,底层运行的还是sparkRDD,

步骤如下:

1.通过sparkSQL,加载Hive的配置文件,获取Hive的元数据信息

2.获取到Hive的元数据信息之后可以拿到Hive表的数据

3.通过sparkSQL来操作Hive表中的数据

【hive on spark】

hive既作为存储又负责sql的解析优化,spark负责执行

这里Hive的执行引擎变成了spark,不再是MR。

这个实现较为麻烦,必须重新编译spark并导入相关jar包

目前大部分使用spark on hive

优化举例

谓词下推

select a.*,b.* from

tb1 a join tb2 b

on a.id = b.id

where a.c1 > 20 and b.c2< 100

优化为

select a.*,b.* from

(select * from tb1 where c1>20) a

join

(select * from tb2 where c2<100) b

on a.id = b.id

减少后期执行过程中的join的shuffle数据量;

列裁剪

select a.name,b.salary from

(select * from tb1 where c1>20) a

join

(select * from tb2 where c2<100) b

on a.id = b.id

优化为

select a.name,b.salary from

(select id,name from tb1 where c1>20) a

join

(select id,salary from tb2 where c2<100) b

on a.id = b.id

执行前将不需要的列裁剪掉,减少数据量获取;

常量累加

select 1+1 as cnt from tb

select 2 as cnt from tb

如何使用sqoop

Sqoop是一个用于将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:MySQL、Oracle、Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。下面是将数据从MySQL导入到HDFS的步骤:

在HDFS中创建目录,用于导入后存放数据。 使用sqoop import命令将MySQL中的表导入到HDFS中。

sqoop import \

--connect jdbc:mysql:/// \

--username \

--password \

--table \

--delete-target-dir \

--target-dir \

--m 1

其中,是MySQL主机名或IP地址,是要导入的数据库名,和是MySQL登录用户名和密码,是要导入的表名,是要存储数据的HDFS目录路径,–m 1表示使用一个Mapper任务执行导入操作。

要使用web调用MySQL中的数据进行前端展示,您需要使用一些技术和工具。以下是一些步骤:

使用Java Web框架(如Spring Boot)创建Web应用程序。在Web应用程序中使用JDBC连接到MySQL数据库。编写SQL查询以从MySQL数据库中检索数据。将检索到的数据存储在Java对象中。使用Java对象将数据传递到前端(HTML、CSS和JavaScript)。在前端使用JavaScript和AJAX技术将数据呈现给用户。

Zookeeper 如何保证分布式系统数据一致性

ZAB协议的消息广播,崩溃恢复和数据同步

消息广播

一个事务请求(Write)进来之后,Leader 节点会将写请求包装成 Proposal 事务,并添加一个全局唯一的 64 位递增事务 ID,也就是 Zxid(消息的先后顺序就是通过比较 Zxid);

Leader 节点向集群中其他节点广播 Proposal 事务,Leader 节点和 Follower 节点是解耦的,通信都会经过一个 FIFO 的消息队列,Leader 会为每一个 Follower 节点分配一个单独的 FIFO 队列,然后把 Proposal 发送到队列中;

Follower 节点收到对应的 Proposal 之后会把它持久到磁盘上,当完全写入之后,发一个 ACK 给 Leader;

当 Leader 节点收到超过半数 Follower 节点的 ACK 之后(Quorum 机制),会提交本地机器上的事务,同时开始广播 commit, Follower 节点收到 commit 之后,完成各自的事务提交。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9voQBzvj-1681778635762)(数仓面经.assets/v2-22d9ae5ce56af93dcbf4466e21b51381_1440w.webp)]

当 Leader(Server1) 发起一个事务 Proposal1 后就宕机了,导致 Follower 都没有 Proposal1。当 Leader 发起 Proposal2 后收到了半数以上的 Follower 的 ACK,但是还没来得及向 Follower 节点发送 Commit 消息就宕机了。

奔溃恢复

集群服务刚启动时进入崩溃恢复阶段选取 Leader 节点。

Leader 节点突然宕机或者由于网络原因导致 Leader 节点与过半的 Follower 失去了联系,集群也会进入崩溃恢复模式。

选举 Leader 节点

各个节点变为 Looking 状态

Leader 宕机后,余下的 Follower 节点都会将自己的状态变更为 Looking(注意 Observer 不参与选举),然后开始进入 Leader 选举过程。 各个 Server 节点都会发出一个投票,参与选举

在第一次投票中,所有的 Server 都会投自己,然后各自将投票发送给集群中所有机器。 集群接收来自各个服务器的投票,开始处理投票和选举

处理投票的过程就是对比 Zxid 的过程,假定 Server3 的 Zxid 最大,Server1 判断 Server3 可以成为 Leader,那么 Server1 就投票给 Server3,判断的依据如下:首先选举 epoch 最大的,如果 epoch 相等,则选 zxid 最大的,若 epoch 和 zxid 都相等,则选择 server id 最大的。 在选举过程中,如果有节点获得超过半数的投票数,则会成为 Leader 节点,反之则重新投票选举。epoch 是 ZooKeeper 中的一个概念,用于标识 ZooKeeper 的版本。每次集群中的 Leader 发生变化时,都会增加 epoch 的值。epoch 的值是一个 32 位的整数,它会被写入到事务日志中,以便在崩溃恢复时使用。当一个 Follower 节点从 Leader 节点同步数据时,如果发现自己的 epoch 比 Leader 节点的 epoch 小,则会拒绝同步数据。 选举成功,各节点的状态为 Leading 和 Following。

Zab 中的节点有三种状态,folloing(当前节点是 Follower 节点),leading(当前节点是 Leader 节点),looking/election(当前节点处于选举状态);伴随着的 Zab 协议消息广播和崩溃恢复两阶段之间的转换,节点状态也随之转换。

消息同步

崩溃恢复完成选举以后,接下来的工作就是数据同步,在选举过程中,通过投票已经确认 Leader 节点是最大 Zxid 的节点,同步阶段就是利用 Leader 前一阶段获得的最新 Proposal 历史同步集群中所有的副本

事实表分类

事务事实表:事务事实表是指以每个事务或事件为单位的表。但是在面对包括商品库存、账户余额等存量性指标还有多事务关联统计的表,逻辑复杂,效率低。周期快照事实表:周期快照事实表以具有规律性的、可预见的时间间隔来记录事实,主要用于分析一些存量型(如商品库存、账户余额)或者状态型(如空气温度、行驶速度)指标。累计快照事实表: 累积快照事实表是基于一个业务流程中的多个关键业务过程联合处理而构建的事实表,如交易流程中的下单、支付、发货、确认收货业务过程。

事实表的设计过程

例子:为交易事务设计事实表: (1) 业务分析:交易包括:下单,支付,发货,完结四个业务过程,其中业务过程可以概括为一个个不可拆分的行为事件。 (2)确定粒度:一个订单中可以有多个商品,因此每个商品都可以对应一个子订单。因此下单,支付,完结选择子订单粒度,发货选择父订单粒度。 (3)确定维度:卖家,买家,商品,商品类别,发货地,收货地。在确定维度时应尽可能多地选择与业务过程相关的环境信息 (4)确定事实:例如在下单过程中有下单金额,下单数量;支持过程中有支付金额,积分金额。事实”一词,指的是每个业务过程的度量(通常是可累加的数值类型的值,如次数、个数、件数、金额等 事实表设计原则

(1)尽量包含域业务过程相关的事实(2)只选择与业务过程相关的事实(3)分解不可加性事实为可加的组件:例如订单的优惠率,可以分解为订单原价金额与订单优惠金额两个事实存储在事实表中(4)在选择维度和事实之前必须先声明粒度:每个维度和事实必须与所定义的粒度保存一致,因为粒度用于确定事实表中一行所代表的业务的细节层次,决定维度模型的扩展性(5)在同一个事实表中不能有多种不同粒度的事实(6)事实的单位要保存一致,比如订单金额,订单优化金额和订单运费统一元或者分(7)对事实的null值用0代替(8)使用退化维度提高事实表的易用性

维度变化

维度属性通常不是静态的,而是随时间变化的,通常采用全量快照表或拉链表保存维度数据的历史状态。

全量快照表:离线数据仓库的计算周期通常为每日一次,所以可以每日保存一份全量的维度数据。拉链表:拉链表可以记录一条数据从开始到当前的所有历史信息,便于查询历史数据。适用于数据部分字段会发生变化,变化的比例不大且频率不高。 拉链表种有开始时间,结束时间还有dt和dp两个分区字段。其中dp有ACTIVE和EXPIRED两个分区,ACTIVE表示数据当前在线下使用,EXPIRED表示数据过期已变更。其中dt为数据转移到HISTORY的时间。

维度表的设计过程

确认维度:在构建事实表时,已经确定了与每张事实表相关的维度,理论上每个相关维度均需对应一张维度表。确定主维表和相关维表确认维度属性:维度属性可直接从主维表或相关维表中选择,也可通过进一步加工得到。维度属性是后续做分析统计时的查询约束条件、分组字段的基本来源,确保丰富性。

数据仓库开发包含几步

a)第一步,进行数据调研;包括了业务调研和需求调研,业务调研就是要弄清楚公司有哪些业务,以及每个业务有包括哪些业务线,一般每个业务会独自建设数据仓库。 b)第二步,进行架构设计;包括了数据域划分和构建总线矩阵,数据域就是指 将业务过程或者维度进行抽象的集合,在划分数据域的时候,应该尽可能保证当前划分的能够覆盖所有的业务需求,又能在新业务进入时无影响的被包含到已有的数据域中或者扩展新的数据域,国际化数仓里面的数据域包括司机域,乘客域,交易域,客服域,安全域等等,阿里巴巴就会有 商品域 会员域 店铺域 交易域 日志域等等。构建总线矩阵,就需要明确每个数据域下有哪些业务过程,业务过程与哪些维度相关。 c)第三步,明确统计指标,深入分析需求,构建指标体系。绝大多数的统计需求可以使用原子指标、派生指标及衍生指标这套标准来定义。公用的派生指标统一保存在数据仓库的DWS层中 d)第四步,维度模型设计。提到的业务总线矩阵即可。事实表存储在DWD层中,维度表存储在DIM层中 e)第五步,汇总模型设计,一张汇总表通常包含业务过程相同、日期限定相同、粒度限定相同的多个派生指标。 f)第五步,进行代码开发和上线生成调度任务,进行周期运行。

ETL在不同层工作

在ODS层不建议做过多的数据清洗操作,为了后续追溯数据问题在DWD数据明细层,该层的数据粒度和ODS层一致。但是该层会采用一些维度退化手段,将维度退化到事实表中。同时,还会做一些数据聚合,将相同主题汇聚到一张表中。在DWM数据中间层,对通用的核心维度进行聚合操作,算出相应的统计指标在DWS数据服务层,该层生成字段较多的宽表。在DWM层先计算出多个小的中间表,然后再拼成一张DWS表再数据应用层,数据一般存放在ES,PostgreSQL,Redis,Hive,Druid种共数据分析和数据挖掘使用。

拉链表,快照表,全量表,增量表

全量表:存储了全部数据的表,全量表没有分区的,所有数据都储存在一个分区中。全量表存储的是截至到目前最新状态的全部记录。适用于数量小,更新频率低的场景,比如维度表,基础数据表。 增量表:在原表中数据的基础上新增本周期内产生的新数据,适用于更新频率快的场景,比如日志表,交易表。增量表每次新产生的数据是以最新分区增加到表中的,例如按天更新的流量表,每天更新只新增一天内产生的新数据。 快照表:快照表就是截至过去某个时间点的所有数据。拉链表储存的是在快照表的基础上去除了重复状态的数据。 拉链表:可以记录数据从开始一直到当前所有变化的信息,适用于订单表和客户表

拉链表中有开始时间 (start time) 和结束时间 (end time) 两个字段,同时有dt和dp两个分区字段。dp: 一般有ACTIVE (线上)和EXPIRED (过期)两个分区。ACTIVE表示数据当前在线上使用,所以其end time为4712-12-31 (系统能处理的最大时间,表示一个达不到的无限向后延伸的时间,意味着该数据在线上永久有效):EXPIRED表示数据过期,已变更,为历史状态,其end time是数据变更时具体的时间。对于部分拉链表dp中还有HISTORY分区,此是由于有些拉链表数据量巨大,造成ACTIVE分区使用困难,因此将一部分业务上不再变更的数据转移到HISTORY分区。数据所在的时间分区,记录数据从ACTIVE转移到EXPIRED的日期,即数据发生变更的时间,大部分与end time一致;当dp中有HISTORY分区,且数据转移到HISTORY分区时,其dt为数据转移到HISTORY的时间。

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: