Kafka 滚动更新实践

2022/03/31

注意事项

错误处理

任何一步出现 broker 启动失败的问题,立即停止更新!

只要所有 topic 没有单副本,就不用担心。

一般粗暴的解决办法是,将 kafka-logs 目录清空,还原出问题前的配置内容, broker 即可启动。

参照 检查 Kafka 集群可用性状态 确保集群状态可用,才可以继续操作。

流程一

检查当前 Kafka 实例

**# /opt/jdk/bin/jps -m | grep Kafka**
56863 Kafka /opt/kafka/config/server.properties

**# ls -al /opt**
total 16
drwxr-xr-x.  4 root root 4096 Nov 26  2020 .
dr-xr-xr-x. 19 root root 4096 Nov 27  2020 ..
lrwxrwxrwx   1 root root   17 Nov 26  2020 jdk -> /opt/jdk1.8.0_181
drwxr-xr-x   7   10  143 4096 Jul  7  2018 jdk1.8.0_181
lrwxrwxrwx   1 root root   24 Nov 26  2020 kafka -> /opt/kafka_2.11-0.10.2.1
drwxr-xr-x   6 root root 4096 Apr 22  2017 kafka_2.11-0.10.2.1

准备 Kafka 3.1.0 目录

curl -O https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
tar xf kafka_2.13-3.1.0.tgz -C /opt/

停止当前 Kafka 服务

supervisorctl stop kafka
cd /opt/kafka
bin/kafka-server-stop.sh

再次检查 Kafka 是否还在运行

/opt/jdk/bin/jps -m | grep Kafka

没有输出内容,即表示 Kafka 已经停止。

更新 Java 11 (可选)

https://kafka.apache.org/31/documentation.html#java

Java 8 和 Java 11 都是支持的,推荐用 Java 11 ,因为新版有安全方面的改进,以及一定的性能提升。

curl -O https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
tar xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /opt/
rm /opt/jdk
ln -s /opt/jdk-11.0.2 /opt/jdk
export PATH=/opt/jdk/bin/:$PATH

迁移配置文件到新 Kafka 目录

cp /opt/kafka/config/server.properties /opt/kafka_2.13-3.1.0/config/server.properties
cp /opt/kafka/config/log4j.properties /opt/kafka_2.13-3.1.0/config/log4j.properties

修改 server.properties

/opt/kafka_2.13-3.1.0/config/server.properties 文件最后加上如下两行:

inter.broker.protocol.version=0.10.2
log.message.format.version=0.10.2

尝试启动新版本 Kafka 服务

cd `/opt/kafka_2.13-3.1.0`
bin/kafka-server-start.sh -daemon config/server.properties
tail -f logs/server.log

此时运行一会儿,确保一切正常没有影响业务,如有任何问题,都可以回退。

链接到新 Kafka 目录

rm /opt/kafka
ln -s `/opt/kafka_2.13-3.1.0` /opt/kafka

更新集群中下一台 broker

重复流程一的所有步骤,直到集群中所有 broker 都升级到最新,才能进入下一流程。

如果集群中还有旧 broker 在运行,进入下一阶段,就会出现 broker 无法启动的问题!

流程二

正式启用新版本 Kafka

修改配置文件 /opt/kafka_2.13-3.1.0/config/server.properties

inter.broker.protocol.version=3.1

重启新版本 Kafka

# 先停止 kafka 服务
cd `/opt/kafka`
bin/kafka-server-stop.sh

# supervisor 方式启动
supervisorctl start kafka

此时启用了最新版本,已无法降级。 原因: If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. (https://kafka.apache.org/31/documentation.html#upgrade 尝试降级会碰到如下报错:

[2022-03-17 17:59:17,129] ERROR Closing socket for 192.168.64.9:9092-192.168.64.11:58490 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 6 and apiVersion: 7
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 6: 7
        at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
        at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:50)
        at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:66)
        at org.apache.kafka.common.requests.UpdateMetadataRequest.parse(UpdateMetadataRequest.java:306)
        at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:109)
        at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:94)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:89)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:513)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:505)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:505)
        at kafka.network.Processor.run(SocketServer.scala:433)
        at java.lang.Thread.run(Thread.java:748)

更新集群中下一台 broker

重复流程二的所有步骤,直到集群中所有 broker 都重启完成且无报错,才能进入下一流程。

流程三

启用新的消息格式

修改配置文件 /opt/kafka_2.13-3.1.0/config/server.properties

log.message.format.version=3.1

然后再次重启 Kafka 。

更新集群中下一台 broker

重复流程三的所有步骤,直到集群中所有 broker 都重启完成且无报错,即完成升级。

参考