Note for kafka

【缺陷详情】

  1. kafka的内置__consumer_offsets 的topic默认配置下可能会出现文件大小一直增长,不会减小的问题。
    时间长了,可能达到几百GB,影响kafka的使用。
  2. 调整kafka的启动参数,打开jmx的port,便于jmx export采集监控指标
  3. 调整systemd service的stop超时,防止被kill -9
  4. 一些kafka server.properties配置参数优化建议。

【影响范围】

__consumer_offsets的topic配置和broker的offsets.retention.minutes不符合以下修复方案的版本
kafka jmxport未开启的环境
kafka 使用systemctl管理的环境

【修复方案】

由于这些修复方案都需要kafka broker滚动重启,建议一起合并实施。
滚动重启的方式:

  1. systemctl stop kafka
  2. systemctl start kafka
  3. 运行:java -jar /tmp/cmdline-jmxclient.jar - 127.0.0.1:9999 kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager Value value为0,才可以继续下一台。

__consumer_offsets问题的修复方案

  1. 修改该topic的配置,cleanup.policy=compact 把清理策略修改为 delete,增加retention.ms=604800000 配置

    1
    ./kafka-topics.sh --zookeeper ZOOKEEPER地址 --alter --topic __consumer_offsets  --config cleanup.policy=delete --config retention.ms=604800000
  2. 修改server.properties,增加offsets.retention.minutes=20160,并滚动重启各个broker。

开启jmx port

修改 /etc/sysconfig/kafka 文件,在KAFKA_JMX_OPTS=中,增加 -Dcom.sun.management.jmxremote.port=9999
用sed可以这样替换。

1
2
3
sed -i.bak '/^KAFKA_JMX_OPTS=/s/$/ -Dcom.sun.management.jmxremote.port=9999/' /etc/sysconfig/kafka
# diff 确认下
diff /etc/sysconfig/kafka /etc/sysconfig/kafka.bak

确认生效,需要在重启kafka进程后,观察是否监听了9999端口。

调整kafka.service增大timeout时长

增大kafka.service的TimeoutStopSec,因为停止kafka的时候,可能由于流量太大,停止时间过长,超过systemd的默认值(/etc/systemd/system.conf中DefaultTimeoutStopSec的值,默认是5s)会直接发送kill -9信号,可能会导致kafka的磁盘数据问题。

1
2
sed -i '/ExecStart/a TimeoutStopSec=300s' /usr/lib/systemd/system/kafka.service
systemctl daemon-reload

server.properties配置参数优化

kafka broker如果承担较大的流量,且单独部署,可以参考以下配置参数调整,这些参数是在服务器配置为32核CPU、125GB内存下的生产环境验证过的,不可生搬硬套,需要仔细调试。

配置项的默认值,和用途说明可以参考官方文档:Apache Kafka

kafka的配置参数调整建议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# The number of threads handling network requests
num.network.threads=32
# The number of threads doing disk I/O
num.io.threads=32
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=67108864
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=67108864
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=32
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=100000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=3000
# Number of fetcher threads used to replicate messages from a source broker.
# Increasing this value can increase the degree of I/O parallelism in the follower broker
num.replica.fetchers=9
# The number of queued requests allowed before blocking the network threads; default=500
queued.max.requests=2000

offsets.retention.minutes=20160

# kafka的server.properties的配置里最大message 调为10M:
message.max.bytes=10485760
replica.fetch.max.bytes=10485760

# 调整默认的kafka日志保留时间为72小时(三天),并新增offset的过期时间配置(和日志保持一致)
log.retention.hours=72
offsets.retention.minutes=4320
作者

Sony Dog

发布于

2022-07-31

更新于

2023-12-26

许可协议

CC BY-NC-SA 4.0