目录

前言

Kafka Broker

1、工作流程

1.1、Zookeeper 存储的 Kafka 信息

1.2、Kafka Broker 的总体工作流程

1.3、Broke 重要参数

2、Kafka 副本

2.1、副本基本信息

2.2、Keader 选举流程

2.3、Leader 和 Follower 的故障处理细节

Follower 故障

Leader 故障:

2.4、分区副本分配

2.5、生产经验—手动调整分区副本存储

2.6、生产经验—Leader Partition 负载均衡

2.7、生产经验—增加副本因子

3、文件存储

3.1、文件存储机制

1)Topic 数据的存储机制

2)index 文件和log文件

3.2、文件清理策略

1)delete日志删除:将过期数据删除

2)compact 日志压缩

使用场景

4、高效读写数据(面试重点)

总结

前言

        今天学习 Kafka 的第二部分 Broker,相比较 Flink ,Kafka 的流式数据处理还是比较好理解的。

Kafka Broker

环境准备:

启动 Zookeeper启动 Kafka

1、工作流程

我们先看看 Zookeeper 中存储了 Kafka 的哪些信息,它俩是如何协调工作的,

1.1、Zookeeper 存储的 Kafka 信息

我们需要在任意节点启动 Zookeeper 客户端:

# 启动zookeeper客户端

bin/zkCli.sh

# 查看信息

ls /kafka/brokers/ids

/kafka/brokers/ids 

可以看到,我们当前启动了三个 Kafka broker节点。

/kafka/brokers/topics/

这个目录下我们可以看到指定主题的分区信息:

/kafka/consumers/

0.9 版本之前用于保存 offset 信息0.9版本之后 offset 存储在 kfaka 主题中

/kafka/controller

辅助节点选举,这个 controller 是谁取决于哪个节点先在 /kafka/controller 中注册的。在Kafka中,controller负责管理整个集群中所有分区和副本的状态。当检测到某个分区的ISR集合(ISR 就是所有 leader 和 follower 节点之间同步正常的节点集合)发生变化时,controller会负责通知所有broker更新其元数据信息。

1.2、Kafka Broker 的总体工作流程

broker 节点启动后会先去 zookeeper 注册(注册到 /kafka/brokers/ids)代表自己活着然后每个 broker 节点的 controller (每个 broker 节点都有一个 controller)会去zookeeper的 controller 注册一个临时节点(注册到 /kafka/controller),先注册的节点就是 controller ,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker 叫 Kafka Broker follower。选举出来的 controller 会负责管理整个集群中所有分区和副本的状态controller 同时负责 Leader 的选举,选举规则是:首先要成为leader必须是存在于 ISR 中节点,其次按照这些节点在 AR 中的排名(AR 是 Kafak 分区中所有副本的统称),排在前面的节点优先成为 leadercontroller 节点将分区状态信息(当前集群的leade、isr ...)上传到 zookeeper (比如上传到 /kafka/brokers/topics/like/partitions/0/state)其他 controller  去 zookeeper 拉取集群分区状态信息(防止 controller 挂了)假设 leader 挂了controller 会立即检测到这个变化controller 从 zookeeper 中拉取回来集群分区状态信息,继续进行选举选举的前提依然是必须存活于 ISR 队列,之后按照 AR 中的排列顺序,排在前面的优先成为 leader选举完之后,controller 再次更新 zookeeper 中的集分区状态信息

1.3、Broke 重要参数

参数名称 描述 replica.lag.time.max.ms ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。 auto.leader.rebalance.enable 默认是true。 自动Leader Partition 平衡。 leader.imbalance.per.broker.percentage 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 leader.imbalance.check.interval.seconds 默认值300秒。检查leader负载是否平衡的间隔时间。 log.segment.bytes Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。 log.index.interval.bytes 默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。 log.retention.hours Kafka中数据保存的时间,默认7天。 log.retention.minutes Kafka中数据保存的时间,分钟级别,默认关闭。 log.retention.ms Kafka中数据保存的时间,毫秒级别,默认关闭。 log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是5分钟。 log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。 log.cleanup.policy 默认是delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略。 num.io.threads 默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。 num.replica.fetchers 副本拉取线程数,这个参数占总核数的50%的1/3 num.network.threads 默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。 log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

2、Kafka 副本

2.1、副本基本信息

Kafka副本作用:提高数据可靠性。Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。Kafka 中,生产者和消费者只能针对 Leader 进行操作。Kafka分区中的所有副本统称为AR(Assigned Repllicas)。

 AR = ISR + OSR

ISR,表示和Leader保持同步的Follower集合。如果 Follower 长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR(踢出到 OSR)。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从 ISR 中选举新的Leader。

OSR,表示Follower与Leader副本同步时,延迟过多的副本(特定条件下会恢复到 ISR)。

2.2、Keader 选举流程

        Kafka集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker的上下线,所有 topic 的分区副本分配和Leader选举等工作。

        Controller的信息同步工作是依赖于Zookeeper的。

接下来我们来验证一下:

1. 创建一个新的topic,3个分区,3个副本

./kafka-topics.sh --bootstrap-server hadoop102:9092 -create --topic buy --partitions 3 -replication-factor 3

2. 查看 Leader 分布情况

注意区分 AR 和 ISR: 

 

3. 停止掉hadoop103 的 kafka 进程,并查看Leader分区情况

上面的图中我们可以看到hadoop103上的副本一共有 3 个 ,分别存在节点 [2,0,1] 上,并且当前的副本 leader 是 2,现在我们把 hadoop103 节点的 kafka 进程停掉,相当于保存在分区 1 的副本全部故障,而我们分区 3 中的副本的 leader 刚好是节点1,所以要把副本的 leader 进行更新,要按照理论应该是 2 号节点继续上任副本 leader 的位置。

 

我们发现,我们分区 3 的副本 leader  原本存在分区2 中,但是现在分区 2 挂掉了,所以它会从自己的 AR 中选一个在前面的节点 [2,0],于是 分区2 中的 副本 leader 变成了节点 2。

4. 我们再恢复一下hadoop103

[lyh@hadoop103 bin] ./kafka-server-start.sh -daemon ../config/server.properties

2.3、Leader 和 Follower 的故障处理细节

概念:

LEO(Log End Offset):每个副本中最后一个 offset ,LEO 就是最新的 offset +1HW(High Watermark):所有副本中最小的 LEO

Follower 故障

Leader 故障:

注意:只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复!

比如上面,我们的 leader 一开始是节点0,但是 leader 挂掉之后,leader 变成了节点1,但是新的leader 显然比旧的 leader 的数据少了3条,所以这部分数据就丢失了(毕竟自己处理问题总不能让人家生产者再发送一遍)。

2.4、分区副本分配

        如果kafka服务器只有3个节点,那么设置kafka 的分区数大于服务器台数,在kafka底层如何分配存储副本呢?

        注意:分区数可以大于机器数,毕竟大不了一个机器上多存几个分区,但是副本数不能大于机器数,那样没有意义,而且 Kafka 会报错:

我有三台机器,上面试着创建了 5 个副本的一个 topic,报错不能超过机器数量。

1)创建12分区,3个副本

./kafka-topics.sh -bootstrap-server hadoop102:9092 --create --topic card --partitions 12 --replication-factor 3

2)查看分区和副本信息

我们可以看到,Kafka 使得我们每个副本的 Leader 都尽可能的不一样,这样很好地分担了读写压力,毕竟Kafka 生产者和消费者都是只对分区 leader 进行操作的。每4个分区对应一个副本,而且每一个副本的 AR 顺序 Kafka 都尽量使它们不一样,这样可以尽可能做到负载均衡。

2.5、生产经验—手动调整分区副本存储

        生产环境中,每台服务器的配置和性能都不一致,但是 Kafka 只会根据自己的代码创建对应的分区副本,就会导致个别服务器存储压力比较大。所以需要手动调整分区副本的存储。

        需求:创建一个新的 topic ,4个分区,两个副本。我们将该 topic 的所有副本都存储到 hadoop102 和 hadoop103 上,hadoop104 不存储任何数据。

1)创建一个新的 topic 叫做 card ,4个分区,2个副本

2)查看分区副本信息

3)创建副本存储计划(所有副本都指定存储在hadoop102、hadoop103中)。

4)执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-repliaction-factor.json --execute

5)验证副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-repliaction-factor.json --verify

6)查看分区副本存储情况

2.6、生产经验—Leader Partition 负载均衡

        正常情况下,Kafka 本身会自动把 Leader Partition 均匀分散在各个机器上,来保证每台节点的吞吐量都是均匀的。但是如果某些 broker 节点宕机,会导致 leader partition 过于集中在其他少部分几台 broker 节点上,这会导致少数几台 broker 节点的读写压力过高,而且即使这些宕机的 broker 再次恢复上线,也只是 follower partition ,不会再次恢复为 leader partition,造成集群负载不均衡。

auto.leader.rebalance.enable,more为 true。自动 Leader partition 平衡leader.imbalance.per.broker.percentag,默认是 10%,每个 broker 允许不平衡的leader的比率。如果超过这个值,控制器会触发 leader 的平衡。leader.imbalance.check.interval.seconds,默认300s,也就是每300s检查一次leader负载是否平衡。

在生产环境中,通常不建议开启自动平衡,因为这可能会影响性能。

2.7、生产经验—增加副本因子

        假设我们创建了一个主题并设置副本数为 1 ,但是后来我们发现这部分数据特别重要,于是想要增加副本数,怎么办呢?通过命令行是不行的,得像我们上面 2.5 手动调整分区副本 一样,通过 json 文件来修改。

手动增加副本存储

1)创建副本存储计划(所有副本都指定存储在hadoop102、hadoop103、hadoop104中)

vim increase-replication-factor.json

# 输入下面的内容

{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}

2)执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

3、文件存储

3.1、文件存储机制

1)Topic 数据的存储机制

        Topic是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应一个 log 文件,该 log 文件中存储的就是 producer 产生的数据。producer 产生的数据会被不断追加(追加是 Kafka 能够高效读写的一个重要原因)到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分区和索引机制,将每个 partition 分为多个 segment 。每个 segment 包括多个:“.index”文件、“.log”文件和 .timeindex 等文件(.timeindex 是一个时间戳索引文件,描述了文件保存时间,因为 Kafka 中的数据默认为保存 7 天才会自动删除,而判断文件日期是否达到7天就需要判断这个文件)。这些文件位于一个文件夹下,文件夹的命名规则为:topic名称+分区序号,例如:like-0。

# 000000~170409

00000000000000000000.index

00000000000000000000.log

# 170410~239429

00000000000000170410.index

00000000000000170410.log

# 239430~

00000000000000239430.index

00000000000000239430.log

验证:

1. 查看 kafka 的 data 目录下的 topic 数据

2. 这些文件因为都是经过序列化的所以都是乱码,需要使用 Kafka 提供的一个工具来看: 

# 查看 .index 文件

kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000015.index

# 查看 .log 文件

kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000015.log

2)index 文件和log文件

kafka 的 index 是稀疏索引,大约每往 .log 文件中写入 4KB 数据,会往 .index 文件写入一条索引。参数 log.index.interval.bytes 默认 4kb;index 文件中保存的 offset 是相对 offset,这样能确保 offset 的值所占空间不会太大,因此能将 offset 的值控制在固定大小。一个 segment 文件大小 1GB

现有索引文件 000000.index,0000522.index,000001005.index ,如何在 log 文件中定位到 offset=600 的record?

offset=600,offset 大于522 小于 1005,说明就要找的文件索引就在 522.index 文件中,所以去 00000522.index 文件中000001005.index 文件中的数据有多行,因为每4kb数据就会往这个文件写一条记录

可以看到 587 就是我们的 index 文件名中的基础offset+相对offset得到的,计算得到 587 后我们发现要找的 offset=600 是大于 587 小于 639 的,说明要找的 record 就在这一行。于是查看 position 得到 6410,接着查看 log文件。

找到 000000522.index 对应的 000000522.log 文件,这个 log 文件同样有一个 position 属性,我们要找的 6410 刚好在log文件中就有一条记录的position= 6410,这就找到了。如果我们要找的position=6415,那么我们就得找到介于这个值中间的数据,因为 6410 < 6415 <10090 所以我们要找的 position=6415的数据就在 6410这一行。

3.2、文件清理策略

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

log.retention.hours(int),最低优先级小时,默认7天。log.retention.minutes(int),分钟。log.retention.ms(long),最高优先级毫秒。log.retention.check.interval.ms,负责设置检查周期,默认5分钟。

这四个参数的优先级从上到下越来越高,也就是说 当我们设置了日志保存参数为 ms 级别时,前面设置的 hours 和 minutes 级别的参数就都失效了。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka中提供的日志清理策略有 delete 和 compact 两种。

1)delete日志删除:将过期数据删除

log.cleanup.policy = delete    所有数据启用删除策略

(1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。

log.retention.bytes(long),默认等于-1,表示无穷大。

思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?

 当然是以segment中所有记录中的最大时间戳作为该文件时间戳,所以即使数据有99.9%是旧的,只要有0.01%是新的数据,就得等它过期了才能删除。

2)compact 日志压缩

compact 日志压缩:对于相同的 key 的不同 value 只保留最后一个版本。

要使用这个功能,只需要修改配置

log.cleanup.policy=compact

使用场景

        这种压缩只能用于特定场景,比如消息的 key 是用户id,value是用户的资料,通过这个压缩,整个消息集里就保存了所有用户最新的信息。

4、高效读写数据(面试重点)

1)Kafka本身是分布式集群,可以采用分区技术,并行度高

2)读数据采用稀疏索引,可以快速定位要消费的数据

3)顺序写磁盘

        Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4)页缓存 + 零拷贝技术

零拷贝:Kakfa 把对数据操作的步骤放到了 生产者和消费者当中(生产者和消费者可以在拦截器来对数据进行处理),所以 Kafka Broker 应用层并不关心数据的存储,所以数据都不需要走应用层,直接走网卡就可以传输费 消费者,传输效率高。

pageCache 页缓存:Kafka 重度依赖底层操作系统提供的 PageCache 功能。当上层有写操作时,操作系统只是将数据写入 pageCache。当读操作发生时,先从 pageCache 中查找,如果找不到,再去磁盘读取。实际上 pageCache 是尽可能把更多的空闲内存空间都当做了磁盘缓存来使用。

参数 描述 log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

总结

        这一节用到的 prettyZoo 挺震撼我的,JavaFX 能开发出如此漂亮使用的一款软件 ,实在让我想不到,希望自己写的软件有一天也可以为百千人使用。

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: