- Broker 端参数:也称为静态参数,Kafka 的配置文件 server.properties 中进行配置的参数,任何修改必须重启 Broker 进程才生效。
- 主题级别参数:Kafka 提供了专门的 kafka-configs 命令来修改。
- JVM 和操作系统级别参数:通用化介绍一些标准的配置参数。
Broker 参数
存储信息
log.dirs
指定 Broker 需要使用若干个文件目录路径。log.dir
只能表示单个路径(旧配置项,建议弃用)。
只要配置 log.dirs
,生产环境必须配置多个路径,格式是逗号分隔的多个路径。最好保证多个目录挂在到不同磁盘上,这么做的好处:
- 提升读写性能:多块物理磁盘比单盘有更高的吞吐量。
- 能够用实现故障转移:即 Failover ,Kafka 1.1 引入的功能。 之前版本中 Broker 使用的任何一块磁盘挂掉,整个 Broker 进程都会关闭。自 1.1 开始,这种情况被修正,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能很正常工作。(没有这种 Failover 的话,需要 RAID 来提供保障)
Zookeeper 相关
Zookeeper 是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有原数据。
zookeeper.connect
指定 zk servers 的值, 例如: zk1:2181,zk2:2181,zk3:2181
如果有两个 Kafka 集群想使用同一套 Zookeeper 集群,那么要用上 chroot 。
两套集群的 zookeeper.connect 配置分别可以是 zk1:2181,zk2:2181,zk3:2181/kafka1 和 zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot 只需要写一次,而且是加到最后的。
Broker 连接相关
客户端或其他 Broker 与该 Broker 如何通信
listeners
监听器,开放 Kafka 服务的地址端口advertised.listeners
Broker 对外发布的地址
监听器是若干个逗号分隔的三元组,每个三元组的格式为 协议名://主机名:端口号
协议名可以是标准名称:
- PLAINTEXT: 明文传输
- SSL: 使用 SSL/TLS 加密传输等
也可以是自定义的协议名称,例如 EXPO://localhost:9092 ;
一旦自己定义了协议名,还必须指定
listener.security.protocol.map
比如: listener.security.protocol.map=EXPO:PLAINTEXT 表示 EXPO 这个自定义协议底层使用明文不加密传输数据。
Topic 管理相关
auto.create.topics.enable
是否允许自动创建 Topic 。建议 false ,不允许自动给创建 topic ,线上环境应该统一管理 topic 。unclean.leader.election.enable
是否允许 Unclean Leader 选举。 必需 false ,坚决不能让落后太多的副本竞选 Leader 。auto.leader.rebalance.enable
是否定期选举 Leader 。必须 false ,这个参数的行为是换 Leader ,换 Leader 代价很高,原本向 A 发送请求的客户端都要切换到 B ,而且换 Leader 本质上没有任何性能收益。
数据留存相关
-
log.retention.{hours|minutes|ms}
三个参数,控制一条消息被保存多长时间,优先级 ms > minutes > hours -
log.retention.bytes
指定 Broker 为消息保存的总磁盘容量大小,默认 -1,表示不限制。这个参数真正发挥作用的场景是云上构建多租户的 Kafka 集群,为了避免“恶意”租户食用过多的磁盘空间。 -
message.max.bytes
控制 Broker 能够接收的最大消息大小。默认是 1000012,不到 1MB ,太小了。它只是一个标尺,衡量 Broker 能处理的最大消息大小,即使设置大一点也不会耗费磁盘空间。- 对应调整 Broker 端
replica.fetch.max.bytes
保证复制正常 - 消费端
fetch.message.max.bytes
- 对应调整 Broker 端
Topic 级别的参数
Topic 级别参数优先级高于 Broker 参数。
从保存消息的维度,下面参数很重要:
retention.ms
:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。retention.bytes
:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
从能处理消息的大小:
max.message.bytes
它决定 Broker 能够正常接收该 Topic 的最大消息大小。
Topic 级别参数的设置
有两种方式:
- 创建 Topic 时设置
- 修改 Topic 时设置
创建 Topic 时设置参数
设想你的部门需要将交易数据发送 Kafka 进行处理,需要保存最近半年的交易数据,(单条消息)数据大小不超过 5MB :
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic transaction \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=15552000000 \
--config max.message.bytes=5242880
假设要将发送最大值调整为 10MB ,用 kafka-configs 命令来修改:
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name transaction \
--alter \
--add-config max.message.bytes=10485760
统一使用 kafka-configs 脚本来调整 topic 级别参数。
JVM 参数
KAFKA_HEAP_OPTS
制定堆大小
以后讨论调优 Kafka 性能问题。目前业界比较公认的一个合理值, JVM 堆大小 6GB 。
KAFKA_JVM_PERFORMANCE_OPTS
制定 GC 参数
Java 7 根据以下规则设置:
- Broker 所在机器 CPU 充裕,建议使用 CMS 收集器:
-XX:+UseCurrentMarkSweepGC
- 否则,使用吞吐量收集器:
-XX:+UseParallelGC
Java 8 : 使用 G1 回收器:-XX:+UseG1GC
举例
在启动 Kafka 之前配置环境变量:
export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
bin/kafka-server-start.sh config/server.properties
操作系统参数
需要关注的:
- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间
ulimit -n
通常情况下将它设置成一个超大的值是合理的做法,比如 ulimit -n 1000000
FS
根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。
最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。
swap
不要设置成 0 比较好,我们可以设置成一个较小的值。 因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。 但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。 建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。
Flush
提交时间或者说是 Flush 落盘时间。 向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。 这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。 如果在页缓存中的数据在写入到磁盘前机器宕机了,数据就丢失了。但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。