您的当前位置:首页Kafka常用命令收录

Kafka常用命令收录

2022-02-10 来源:乌哈旅游
Kafka常⽤命令收录

⽬录

⽬录 11. 前⾔ 2

2. Broker默认端⼝号 23. 安装Kafka 24. 启动Kafka 25. 创建Topic 26. 列出所有Topic 37. 删除Topic 38. 查看Topic 3

9. 增加topic的partition数 410. ⽣产消息 411. 消费消息 4

12. 查看有哪些消费者Group 413. 查看新消费者详情 514. 查看Group详情 515. 删除Group 5

16. 设置consumer group的offset 517. RdKafka⾃带⽰例 618. 平衡leader 619. ⾃带压测⼯具 6

20. 查看topic指定分区offset的最⼤值或最⼩值 621. 查看__consumer_offsets 6

22. 获取指定consumer group的位移信息 723. 20) 查看kafka的zookeeper 7

24. 如何增加__consumer_offsets的副本数? 925. 问题 11

附1:进程监控⼯具process_monitor.sh 12附2:批量操作⼯具 12

附2.1:批量执⾏命令⼯具:mooon_ssh 12附2.2:批量上传⽂件⼯具:mooon_upload 13附2.3:使⽤⽰例 13

附3:批量设置broker.id和listeners⼯具 15附4:批量设置hostname⼯具 16附5:Kafka监控⼯具kafka-manager 16附6:kafka的安装 16附7:__consumer_offsets 17

1. 前⾔

本⽂内容主要来⾃两个⽅⾯:⼀是⽹上的分享,⼆是⾃研的随⼿记。⽇记⽉累,收录kafka各种命令,会持续更新。

在0.9.0.0之后的Kafka,出现了⼏个新变动,⼀个是在Server端增加了GroupCoordinator这个⾓⾊,另⼀个较⼤的变动是将topic的offset 信息由之前存储在zookeeper上改为存储到⼀个特殊的topic(__consumer_offsets)中。

Kafka的瓶颈容易发⽣在⽹卡,⽽不是CPU、内存和磁盘,所以应当考虑log的压缩。

相关⽹址:

2. Broker默认端⼝号

9092,建议安装时,在zookeeper中指定kafka的根⽬录,⽐如“/kafka”,⽽不是直接使⽤“/”,这样多套kafka也可共享同⼀个zookeeper集群。

3. 安装Kafka

Kafka依赖Zookeeper,本⾝⾃带了Zookeeper,不过建议另外安装Zookeeper。

4. 启动Kafka

kafka-server-start.sh config/server.properties后台常驻⽅式,请带上参数“-daemon”,如:

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

5. 创建Topic

参数--topic指定Topic名,--partitions指定分区数,--replication-factor指定备份数:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

注意,如果配置⽂件server.properties指定了kafka在zookeeper上的⽬录,则参数也要指定,否则会报⽆可⽤的brokers,如:kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test

6. 列出所有Topic

kafka-topics.sh --list --zookeeper localhost:2181

注意,如果配置⽂件server.properties指定了kafka在zookeeper上的⽬录,则参数也要指定,否则会报⽆可⽤的brokers,如:kafka-topics.sh --list --zookeeper localhost:2181/kafka

输出⽰例:

__consumer_offsetsmy-replicated-topictest

7. 删除Topic

1) kafka-topics.sh --zookeeper localhost:2181 --topic test --delete2) kafka-topics.sh --zookeeper localhost:2181/kafka --topic test --delete

3) kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test

8. 查看Topic

kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

注意,如果配置⽂件server.properties指定了kafka在zookeeper上的⽬录,则参数也要指定,否则会报⽆可⽤的brokers,如:kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic test

输出⽰例:

Topic:test PartitionCount:3 ReplicationFactor:2 Configs:

Topic: test Partition: 0 Leader: 140 Replicas: 140,214 Isr: 140,214Topic: test Partition: 1 Leader: 214 Replicas: 214,215 Isr: 214,215Topic: test Partition: 2 Leader: 215 Replicas: 215,138 Isr: 215,138

9. 增加topic的partition数

kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5

10. ⽣产消息

kafka-console-producer.sh --broker-list localhost:9092 --topic test

11. 消费消息

1) 从头开始

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning2) 从尾部开始

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest3) 指定分区

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 14) 取指定个数

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 1 --max-messages 1

5) 新消费者(ver>=0.9)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

12. 查看有哪些消费者Group

1) 分ZooKeeper⽅式(⽼)

kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181/kafka --list2) API⽅式(新)

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list

输出⽰例:test

console-consumer-37602console-consumer-75637console-consumer-59893

13. 查看新消费者详情

仅⽀持offset存储在zookeeper上的:

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

14. 查看Group详情

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe

输出⽰例:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

test 1 87 87 0 - - -

15. 删除Group

⽼版本的ZooKeeper⽅式可以删除Group,新版本则⾃动删除,当执⾏:

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --delete

输出如下提⽰:

Option '[delete]' is only valid with '[zookeeper]'.

Option '[delete]' is only valid with '[zookeeper]'.

Note that there's no need to delete group metadata for the new consumeras the group is deleted when the last committed offset for that group expires.

16. 设置consumer group的offset

执⾏zkCli.sh进⼊zookeeper命令⾏界⾯,假设需将group为testgroup的topic的offset设置为2018,则:set /consumers/testgroup/offsets/test/0 2018如果kakfa在zookeeper中的根⽬录不是“/”,⽽是“/kafka”,则:set /kafka/consumers/testgroup/offsets/test/0 2018

另外,还可以使⽤kafka⾃带⼯具kafka-run-class.sh kafka.tools.UpdateOffsetsInZK修改,命令⽤法:kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic从⽤法提⽰可以看出,只能修改为earliest或latest,没有直接修改zookeeper灵活。

17. RdKafka⾃带⽰例

rdkafka_consumer_example -b 127.0.0.1:9092 -g test testrdkafka_consumer_example -e -b 127.0.0.1:9092 -g test test

18. 平衡leader

kafka-preferred-replica-election.sh --zookeeper localhost:2181/chroot

19. ⾃带压测⼯具

kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

20. 查看topic指定分区offset的最⼤值或最⼩值

time为-1时表⽰最⼤值,为-2时表⽰最⼩值:

kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list 127.0.0.1:9092 --partitions 0

21. 查看__consumer_offsets

需consumer.properties中设置exclude.internal.topics=false:1) 0.11.0.0之前版本

kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter \"kafka.coordinator.GroupMetadataManager\\$OffsetsMessageFormatter\" --consumer.config config/consumer.properties --from-beginning2) 0.11.0.0之后版本(含)

kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter \"kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter\" --consumer.config config/consumer.properties --from-beginning

22. 获取指定consumer group的位移信息

需consumer.properties中设置exclude.internal.topics=false:1) 0.11.0.0版本之前:

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter \"kafka.coordinator.GroupMetadataManager\\$OffsetsMessageFormatter\"2) 0.11.0.0版本以后(含):

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter \"kafka.coordinator.group.GroupMetadataManager\\$OffsetsMessageFormatter\"

23. 20) 查看kafka的zookeeper

1) 查看Kakfa在zookeeper的根⽬录

[zk: localhost:2181(CONNECTED) 0] ls /kafka

[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, config]

2) 查看brokers

[zk: localhost:2181(CONNECTED) 1] ls /kafka/brokers

[zk: localhost:2181(CONNECTED) 1] ls /kafka/brokers[ids, topics, seqid]

3) 查看有哪些brokers(214和215等为server.properties中配置的broker.id值):[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids[214, 215, 138, 139]

4) 查看broker 214,下列数据显⽰该broker没有设置JMX_PORT:[zk: localhost:2181(CONNECTED) 4] get /kafka/brokers/ids/214

{\"listener_security_protocol_map\":{\"PLAINTEXT\":\"PLAINTEXT\204:9092\"],\"jmx_port\":-1,\"host\":\"test-204\cZxid = 0x200002400

ctime = Mon Jun 26 16:57:44 CST 2017mZxid = 0x200002400

mtime = Mon Jun 26 16:57:44 CST 2017pZxid = 0x200002400cversion = 0dataVersion = 0aclVersion = 0

ephemeralOwner = 0x45b9d9e841f0136dataLength = 190numChildren = 0

5) 查看controller,下列数据显⽰broker 214为controller:[zk: localhost:2181(CONNECTED) 9] get /kafka/controller{\"version\":1,\"brokerid\":214,\"timestamp\":\"1498467946988\cZxid = 0x200002438

ctime = Mon Jun 26 17:05:46 CST 2017mZxid = 0x200002438

mtime = Mon Jun 26 17:05:46 CST 2017pZxid = 0x200002438cversion = 0dataVersion = 0aclVersion = 0

ephemeralOwner = 0x45b9d9e841f0136dataLength = 56numChildren = 0

6) 查看kafka集群的id:

[zk: localhost:2181(CONNECTED) 13] get /kafka/cluster/id{\"version\":\"1\cZxid = 0x2000023e7

ctime = Mon Jun 26 16:57:28 CST 2017mZxid = 0x2000023e7

mtime = Mon Jun 26 16:57:28 CST 2017pZxid = 0x2000023e7cversion = 0dataVersion = 0

aclVersion = 0ephemeralOwner = 0x0dataLength = 45numChildren = 0

7) 查看有哪些topics:

[zk: localhost:2181(CONNECTED) 16] ls /kafka/brokers/topics

[test, my-replicated-topic, test1, test2, test3, test123, __consumer_offsets, info]

8) 查看topic下有哪些partitions:

[zk: localhost:2181(CONNECTED) 19] ls /kafka/brokers/topics/__consumer_offsets/partitions

[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]

9) 查看“partition 0”的状态:

[zk: localhost:2181(CONNECTED) 22] get /kafka/brokers/topics/__consumer_offsets/partitions/0/state{\"controller_epoch\":2,\"leader\":215,\"version\":1,\"leader_epoch\":1,\"isr\":[215,214]}cZxid = 0x2000024c6

ctime = Mon Jun 26 18:02:07 CST 2017mZxid = 0x200bc4fc3

mtime = Mon Aug 27 18:58:10 CST 2018pZxid = 0x2000024c6cversion = 0dataVersion = 1aclVersion = 0ephemeralOwner = 0x0dataLength = 80numChildren = 0

24. 如何增加__consumer_offsets的副本数?

可使⽤kafka-reassign-partitions.sh来增加__consumer_offsets的副本数,⽅法如下,构造⼀JSON⽂件reassign.json:{

\"version\":1, \"partitions\":[

{\"topic\":\"__consumer_offsets\ {\"topic\":\"__consumer_offsets\ {\"topic\":\"__consumer_offsets\ {\"topic\":\"__consumer_offsets\ ...

{\"topic\":\"__consumer_offsets\ ]}

然后执⾏:

kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute

“[1,2,3]”中的数字为broker.id值。

如果执⾏报错“Partitions reassignment failed due to Partition reassignment data file is empty”,可能是因为reasign.json⽂件格式不对,⽐如成下列格式了(中间的没有以逗号结尾):

{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\

如果执⾏遇到下列错误:

Partitions reassignment failed due to Partition replica lists may not contain duplicate entries: __consumer_offsets-16 contains multiple entries for 2. __consumer_offsets-39 contains multiple entries for 2. __consumer_offsets-40 contains multiple entries for 3. __consumer_offsets-44 contains multiple entries for 3

原因是⼀个分区的两个副本被指定在同⼀个broker上,以16号分区为列,有两个副本落在了broker 2上:{\"topic\":\"__consumer_offsets\

执⾏成功后的输出:

$ ../bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.35.31:2181/kafka --reassignment-json-file __consumer_offsets.reassign --executeCurrent partition replica assignment

{\"version\":1,\"partitions\":[{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\{\"topic\":\"__consumer_offsets\

{\"topic\":\"__consumer_offsets\

Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions.

25. 问题

1) -190,Local: Unknown partition

⽐如单机版只有⼀个分区,但prodcue参数的分区值为1等。

2) Rdkafka程序⽇志“delivery failed. errMsg:[Local: Message timed out]”同⼀个程序,在有些机器上会这个错误,有些机器则⼯作正常,相关的issues:实测是因为在运⾏Kafka应⽤程序的机器上没有配置Kafka Brokers机器的hosts。

另外的解决办法是在server.properties配置listeners和advertised.listeners,并且使⽤IP⽽不是hostname作为值。

3) Name or service not known (after 9020595078ms in state INIT)

event_callback: type(0), severity(3), (-193)kafka-204:9092/214: Failed to resolve 'kafka-204:9092': Name or service not known (after 9020595078ms in state INIT)原因是运⾏kafka应⽤程序(⾮kafka本⾝)的机器不能识别主机名kafka-204(Kafka Brokers机器可以识别),解决办法是在server.properties配置listeners和advertised.listeners,并且使⽤IP⽽不是hostname作为值。

附1:进程监控⼯具process_monitor.sh

process_monitor.sh为shell脚本,本⾝含详细的使⽤说明和帮助提⽰。适合放在crontab中,检测到进程不在时,3秒左右时间重拉起。⽀持不同⽤户运⾏相同程序,也⽀持同⼀⽤户带不同参数运⾏相同程序。下载⽹址:

使⽤⽰例:

* * * * * /usr/local/bin/process_monitor.sh \"/usr/local/jdk/bin/java kafkaServer\" \"/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties\"

由于所有的java程序均运⾏在JVM中,所以程序名均为java,“kafkaServer”⽤于限定只监控kafka。如果同⼀⽤户运⾏多个kafka实例,则需加端⼝号区分,并且要求端⼝号为命令⾏参数,和“kafkaServer”共同组成匹配模式。

当检测到进程不存在时,则执⾏第三列的重启指令“/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties”。

使⽤⽰例2,监控zooekeeper:

* * * * * /usr/local/bin/process_monitor.sh \"/usr/local/jdk/bin/java -Dzookeeper\" \"/data/zookeeper/bin/zkServer.sh start\"

附2:批量操作⼯具

适⽤⽤来批量安装kafka和⽇常运维。下载⽹址:

监控⼯具有两个版本:⼀是C++版本,另⼀是GO版本。当前C++版本⽐较成熟,GO版本相当简略,但C++版本依赖C++运⾏时库,不同环境需要特定编译,⽽GO版本可不依赖C和C++运⾏时库,所以不需编译即可应⽤到⼴泛的Linux环境。

使⽤简单,直接执⾏命令,即会提⽰⽤法。

附2.1:批量执⾏命令⼯具:mooon_ssh

参数名-u-p-h-P-c

默认值⽆⽆⽆

22,可修改源码,编译为常⽤端⼝号⽆

说明

⽤户名参数,可⽤环境变量U替代密码参数,可⽤环境变量P替代IP列表参数,可⽤环境变量H替代SSH端⼝参数,可⽤环境变量PORT替代

在远程机器上执⾏的命令,建议单引号⽅式指定值,除⾮要执⾏的

-c⽆

在远程机器上执⾏的命令,建议单引号⽅式指定值,除⾮要执⾏的命令本⾝已经包含了单引号有冲突。使⽤双引号时,要注意转义,否则会被本地shell解释⼯具输出的详细度

-v1

附2.2:批量上传⽂件⼯具:mooon_upload

参数名-u-p-h-P

默认值⽆⽆⽆

说明

⽤户名参数,可⽤环境变量U替代密码参数,可⽤环境变量P替代IP列表参数,可⽤环境变量H替代

22,可修改源SSH端⼝参数,可⽤环境变量PORT替代码,编译为常⽤端⼝号⽆⽆

以逗号分隔的,需要上传的本地⽂件列表,可以带相对或绝对⽬录

⽂件上传到远程机器的⽬录,只能为单个⽬录

-s-d

附2.3:使⽤⽰例

1) 使⽤⽰例1:上传/etc/hostsmooon_upload -s=/etc/hosts -d=/etc

2) 使⽤⽰例2:检查/etc/profile⽂件是否⼀致mooon_ssh -c='md5sum /etc/hosts'

3) 使⽤⽰例3:批量查看crontabmooon_ssh -c='crontab -l'

4) 使⽤⽰例4:批量清空crontab

mooon_ssh -c='rm -f /tmp/crontab.empty;touch /tmp/crontab.empty'mooon_ssh -c='crontab /tmp/crontab.emtpy'

5) 使⽤⽰例5:批量更新crontab

mooon_ssh -c='crontab /tmp/crontab.online'

6) 使⽤⽰例6:取远端机器IP

因为awk⽤单引号,所以参数“-c”的值不能使⽤单引号,所以内容需要转义,相对其它来说要复杂点:

mooon_ssh -c=\"netstat -ie | awk -F[\\\\ :]+ 'BEGIN{ok=0;}

{if (match(\\$0, \\\"eth1\\\")) ok=1; if ((1==ok) && match(\\$0,\\\"inet\\\")) { ok=0; if (7==NF) printf(\\\"%s\\\\n\\\

不同的环境,IP在“netstat -ie”输出中的位置稍有不同,所以awk中加了“7==NF”判断,但仍不⼀定适⽤于所有的环境。需要转义的字符包含:双引号、美元符和斜杠。

7) 使⽤⽰例7:批量查看kafka进程(环境变量⽅式)

$ export H=192.168.31.9,192.168.31.10,192.168.31.11,192.168.31.12,192.168.31.13$ export U=kafka$ export P='123456'

$ mooon_ssh -c='/usr/local/jdk/bin/jps -m'[192.168.31.15]

50928 Kafka /data/kafka/config/server.properties125735 Jps -m

[192.168.31.15] SUCCESS

[192.168.31.16]

147842 Jps -m

174902 Kafka /data/kafka/config/server.properties[192.168.31.16] SUCCESS

[192.168.31.17]

51409 Kafka /data/kafka/config/server.properties178771 Jps -m

[192.168.31.17] SUCCESS

[192.168.31.18]73568 Jps -m

62314 Kafka /data/kafka/config/server.properties[192.168.31.18] SUCCESS

[192.168.31.19]123908 Jps -m

182845 Kafka /data/kafka/config/server.properties[192.168.31.19] SUCCESS

================================[192.168.31.15 SUCCESS] 0 seconds[192.168.31.16 SUCCESS] 0 seconds[192.168.31.17 SUCCESS] 0 seconds[192.168.31.18 SUCCESS] 0 seconds[192.168.31.19 SUCCESS] 0 secondsSUCCESS: 5, FAILURE: 0

8) 使⽤⽰例8:批量停⽌kafka进程(参数⽅式)

$ mooon_ssh -c='/data/kafka/bin/kafka-server-stop.sh' -u=kafka -p='123456' -h=192.168.31.15,192.168.31.16,192.168.31.17,192.168.31.18,192.168.31.19[192.168.31.15]No kafka server to stopcommand return 1

[192.168.31.16]No kafka server to stopcommand return 1

[192.168.31.17]No kafka server to stopcommand return 1

[192.168.31.18]No kafka server to stopcommand return 1

[192.168.31.19]

No kafka server to stopcommand return 1

================================[192.168.31.15 FAILURE] 0 seconds[192.168.31.16 FAILURE] 0 seconds[192.168.31.17 FAILURE] 0 seconds[192.168.31.18 FAILURE] 0 seconds[192.168.31.19 FAILURE] 0 secondsSUCCESS: 0, FAILURE: 5

附3:批量设置broker.id和listeners⼯具

为shell脚本,有详细的使⽤说明和帮助提⽰,依赖mooon_ssh和mooon_upload:

附4:批量设置hostname⼯具

为shell脚本,有详细的使⽤说明和帮助提⽰,依赖mooon_ssh和mooon_upload:

附5:Kafka监控⼯具kafka-manager

kafka-manager的数据主要来源两个⽅便:⼀是kafka的zookeeper数据,⼆是kafka的JMX数据。

kafka-manager要求JDK版本不低于1.8,从源码编译kafka-manager相对复杂,但编译拿到⼆进制包后,只需修改application.conf中的“kafka-manager.zkhosts”值,即可开始启动kafka-manager。“kafka-manager.zkhosts”值,不是kafka的zookeeper配置值,⽽是kafka-manager⾃⼰⽤的zookeeper配置,所以两者可以为不同的zookeeper,注意值⽤双引号引起来。

crontab启动⽰例:JMX_PORT=9999

* * * * * /usr/local/bin/process_monitor.sh \"/usr/local/jdk/bin/java kafkaServer\" \"/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties\"

指定JMX_PORT不是必须的,但建议设置,这样kafka-manager可以更详细的查看brokers。crontab中启动kafka-manager⽰例(指定服务端⼝为8080,不指定的默认值为9000):

* * * * * /usr/local/bin/process_monitor.sh \"/usr/local/jdk/bin/java kafka-manager\" \"/data/kafka/kafka-manager/bin/kafka-manager -Dconfig.file=/data/kafka/kafka-manager/conf/application.conf -Dhttp.port=8080 > /dev/null 2>&1\"

process_monitor.sh下载:

注意crontab的⽤户密码有效,crontab才能正常执⾏。

附6:kafka的安装

最基本的两个配置项为server.properties⽂件中的:1) Broker.id

2) zookeeper.connect

其中broker.id每个节点要求不同,zookeeper.connect值建议指定⽬录,不要直接放在zookeeper根⽬录下。另外也建议设置listeners值,不然需要客户端配置hostname和IP的映射关系。

crontab中启动kafka⽰例:JMX_PORT=9999

* * * * * /usr/local/bin/process_monitor.sh \"/usr/local/jdk/bin/java kafkaServer\" \"/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties\"

设置JMX_PORT是为⽅便kafka-manager管理kafka。

附7:__consumer_offsets

__consumer_offsets是kafka内置的Topic,在0.9.0.0之后的Kafka,将topic的offset 信息由之前存储在zookeeper上改为存储到内置的__consumer_offsets中。

server.properties中的配置项num.partitions和default.replication.factor对__consumer_offsets⽆效,⽽是受offsets.topic.num.partitions和offsets.topic.replication.factor两个控制。

因篇幅问题不能全部显示,请点此查看更多更全内容