kafka-topics.sh的使用方式

一、kafka的基本操作1.1、创建topic1.2、查看topic1.3、查看topic属性1.4、发送消息1.5、消费消息

二、kafka-topics.sh 使用方式2.1、查看帮助2.2、副本数量规则2.3、创建主题2.4、查看broker上所有的主题2.5、查看指定主题 topic 的详细信息2.6、修改主题信息之增加主题分区数量2.7、删除主题

三、总结

一、kafka的基本操作

#mermaid-svg-gr8phPfuZkxCGui4 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-gr8phPfuZkxCGui4 .error-icon{fill:#552222;}#mermaid-svg-gr8phPfuZkxCGui4 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-gr8phPfuZkxCGui4 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-gr8phPfuZkxCGui4 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-gr8phPfuZkxCGui4 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-gr8phPfuZkxCGui4 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-gr8phPfuZkxCGui4 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-gr8phPfuZkxCGui4 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-gr8phPfuZkxCGui4 .marker.cross{stroke:#333333;}#mermaid-svg-gr8phPfuZkxCGui4 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-gr8phPfuZkxCGui4 .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-gr8phPfuZkxCGui4 .cluster-label text{fill:#333;}#mermaid-svg-gr8phPfuZkxCGui4 .cluster-label span{color:#333;}#mermaid-svg-gr8phPfuZkxCGui4 .label text,#mermaid-svg-gr8phPfuZkxCGui4 span{fill:#333;color:#333;}#mermaid-svg-gr8phPfuZkxCGui4 .node rect,#mermaid-svg-gr8phPfuZkxCGui4 .node circle,#mermaid-svg-gr8phPfuZkxCGui4 .node ellipse,#mermaid-svg-gr8phPfuZkxCGui4 .node polygon,#mermaid-svg-gr8phPfuZkxCGui4 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-gr8phPfuZkxCGui4 .node .label{text-align:center;}#mermaid-svg-gr8phPfuZkxCGui4 .node.clickable{cursor:pointer;}#mermaid-svg-gr8phPfuZkxCGui4 .arrowheadPath{fill:#333333;}#mermaid-svg-gr8phPfuZkxCGui4 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-gr8phPfuZkxCGui4 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-gr8phPfuZkxCGui4 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-gr8phPfuZkxCGui4 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-gr8phPfuZkxCGui4 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-gr8phPfuZkxCGui4 .cluster text{fill:#333;}#mermaid-svg-gr8phPfuZkxCGui4 .cluster span{color:#333;}#mermaid-svg-gr8phPfuZkxCGui4 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-gr8phPfuZkxCGui4 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

kafka-topics.sh --create

kafka-console-producer.sh

kafka-console-consumer.sh

kafka-topics.sh --list

kafka-topics.sh --describe

创建topic

发送消息

消费消息

查看topic列表

查看topic详情

F

1.1、创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

–create 是创建主题的的动作指令。–zookeeper 指定kafka所连接的zookeeper服务地址。–replicator-factor 指定了副本因子(即副本数量); 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份Partitions分区数。–partitions 指定分区个数;多通道,类似车道。–topic 指定所要创建主题的名称,比如test。

成功则显示:

Created topic "test".

1.2、查看topic

sh kafka-topics.sh --list --zookeeper localhost:2181

显示:

test

1.3、查看topic属性

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

显示:

Topic:test PartitionCount:1 ReplicationFactor:1 Configs:

Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

1.4、发送消息

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

发送端输入:

>hello

>where are you

>let’s go

1.5、消费消息

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test

--from-beginning

消费端显示:

hello

where are you

let’s go

^CProcessed a total of 3 messages

二、kafka-topics.sh 使用方式

创建、修改、删除以及查看等功能。

2.1、查看帮助

/bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使用 --help 参数来打印列出其所需的参数信息。

$ sh kafka-topics.sh --help

Command must include exactly one action: --list, --describe, --create, --alter or --delete

Option Description

------ -----------

--alter Alter the number of partitions,

replica assignment, and/or

configuration for the topic.

--config A topic configuration override for the

topic being created or altered.The

following is a list of valid

configurations:

cleanup.policy

compression.type

delete.retention.ms

file.delete.delay.ms

flush.messages

flush.ms

follower.replication.throttled.

replicas

index.interval.bytes

leader.replication.throttled.replicas

max.message.bytes

message.downconversion.enable

message.format.version

message.timestamp.difference.max.ms

message.timestamp.type

min.cleanable.dirty.ratio

min.compaction.lag.ms

min.insync.replicas

preallocate

retention.bytes

retention.ms

segment.bytes

segment.index.bytes

segment.jitter.ms

segment.ms

unclean.leader.election.enable

See the Kafka documentation for full

details on the topic configs.

--create Create a new topic.

--delete Delete a topic

--delete-config A topic configuration override to be

removed for an existing topic (see

the list of configurations under the

--config option).

--describe List details for the given topics.

--disable-rack-aware Disable rack aware replica assignment

--force Suppress console prompts

--help Print usage information.

--if-exists if set when altering or deleting

topics, the action will only execute

if the topic exists

--if-not-exists if set when creating topics, the

action will only execute if the

topic does not already exist

--list List all available topics.

--partitions The number of partitions for the topic

being created or altered (WARNING:

If partitions are increased for a

topic that has a key, the partition

logic or ordering of the messages

will be affected

--replica-assignment

broker_id_for_part1_replica1 : assignments for the topic being

broker_id_for_part1_replica2 , created or altered.

broker_id_for_part2_replica1 :

broker_id_for_part2_replica2 , ...>

--replication-factor

replication factor> partition in the topic being created.

--topic The topic to be create, alter or

describe. Can also accept a regular

expression except for --create option

--topics-with-overrides if set when describing topics, only

show topics that have overridden

configs

--unavailable-partitions if set when describing topics, only

show partitions whose leader is not

available

--under-replicated-partitions if set when describing topics, only

show under replicated partitions

--zookeeper REQUIRED: The connection string for

the zookeeper connection in the form

host:port. Multiple hosts can be

given to allow fail-over.

2.2、副本数量规则

副本数量不能大于broker的数量。 kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1

报错:

Error while executing topic command : Replication factor: 2 larger than available brokers: 1.

[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.

(kafka.admin.TopicCommand$)

注意:副本数量和分区数量的区别。

2.3、创建主题

创建主题时候,有3个参数是必填的:

–partitions(分区数量)、–topic(主题名) 、–replication-factor(复制系数),

同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

返回显示:

Created topic "test1".

另外在创建主题的时候,还可以附加以下两个选项:–if-not-exists 和 --if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。

2.4、查看broker上所有的主题

–list。

sh kafka-topics.sh --list --zookeeper localhost:2181

结果显示:

__consumer_offsets

test

test1

2.5、查看指定主题 topic 的详细信息

–describe。

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

结果显示:

Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs:

Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

2.6、修改主题信息之增加主题分区数量

–alter。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2

结果显示:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected

Adding partitions succeeded!

查看主题信息:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

可以看到已经成功的将主题的分区数量从1修改为了2。

Topic:test1 PartitionCount:2 ReplicationFactor:1 Configs:

Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Topic: test1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0

当去修改一个不存在的topic信息时(比如修改主题 test2,当前这主题是不存在的)。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2

会报错:

Error while executing topic command : Topic test2 does not exist on ZK path localhost:2181

[2022-11-24 14:21:33,564] ERROR java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181

at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)

at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)

at kafka.admin.TopicCommand.main(TopicCommand.scala)

(kafka.admin.TopicCommand$)

注意:不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题 topic, 然后重新创建。

2.7、删除主题

–delete。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将不会有任何作用。

Topic test1 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

可以测试一些:

# 一个终端启动生产者:

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1

# 另一个终端启动消费者:

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning

发现此时还是可以发送消息和接收消息。如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。

然后就重启kafka:

# 停止:

sh kafka-server-stop.sh -daemon ../config/server.properties

# 启动:

sh kafka-server-start.sh -daemon ../config/server.properties

再次删除就可以了。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

三、总结

介绍Kafka的基本操作,包括创建topic、查看topic、查看topic属性、发送消息和消费消息等。重点介绍了kafka-topics.sh的使用方式,包括如何查看帮助、副本数量规则、创建主题、查看broker上所有的主题、查看指定主题的详细信息、修改主题信息之增加主题分区数量以及删除主题等。通过本文了解到如何使用kafka-topics.sh来有效地管理Kafka主题。

通过介绍Kafka的基本操作和kafka-topics.sh的使用方式,了解了Kafka主题的管理方法。从创建主题到查看和修改主题属性,再到删除主题,通过kafka-topics.sh完成各种主题管理操作。通过学习更加深入地了解Kafka主题管理的技巧。

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 


大家都在找:

kafka:kafka菜鸟教程

分布式:分布式光伏发电项目管理暂行办法

linux:linux和windows的区别

运维:运维是什么意思

服务器:服务器的作用和功能

消息队列:消息队列遥测传输

中间件:中间件的主要作用和功能

大家都在看: