注意事项
- 本指南记录了从 0.10.2.1 更新到 3.1.0,其他版本推荐阅读官方指南。
- 更新期间,持续观察 kafka 消费情况。
- 每次更新一个 Kafka broker 服务。
错误处理
任何一步出现 broker 启动失败的问题,立即停止更新!
只要所有 topic 没有单副本,就不用担心。
一般粗暴的解决办法是,将 kafka-logs 目录清空,还原出问题前的配置内容, broker 即可启动。
参照 检查 Kafka 集群可用性状态 确保集群状态可用,才可以继续操作。
流程一
检查当前 Kafka 实例
**# /opt/jdk/bin/jps -m | grep Kafka**
56863 Kafka /opt/kafka/config/server.properties
**# ls -al /opt**
total 16
drwxr-xr-x. 4 root root 4096 Nov 26 2020 .
dr-xr-xr-x. 19 root root 4096 Nov 27 2020 ..
lrwxrwxrwx 1 root root 17 Nov 26 2020 jdk -> /opt/jdk1.8.0_181
drwxr-xr-x 7 10 143 4096 Jul 7 2018 jdk1.8.0_181
lrwxrwxrwx 1 root root 24 Nov 26 2020 kafka -> /opt/kafka_2.11-0.10.2.1
drwxr-xr-x 6 root root 4096 Apr 22 2017 kafka_2.11-0.10.2.1
准备 Kafka 3.1.0 目录
curl -O https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
tar xf kafka_2.13-3.1.0.tgz -C /opt/
停止当前 Kafka 服务
- 如果使用 supervisor 管理 Kafka 进程:
supervisorctl stop kafka
- 一般方式:
cd /opt/kafka
bin/kafka-server-stop.sh
再次检查 Kafka 是否还在运行
/opt/jdk/bin/jps -m | grep Kafka
没有输出内容,即表示 Kafka 已经停止。
更新 Java 11 (可选)
https://kafka.apache.org/31/documentation.html#java
Java 8 和 Java 11 都是支持的,推荐用 Java 11 ,因为新版有安全方面的改进,以及一定的性能提升。
curl -O https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
tar xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /opt/
rm /opt/jdk
ln -s /opt/jdk-11.0.2 /opt/jdk
export PATH=/opt/jdk/bin/:$PATH
迁移配置文件到新 Kafka 目录
cp /opt/kafka/config/server.properties /opt/kafka_2.13-3.1.0/config/server.properties
cp /opt/kafka/config/log4j.properties /opt/kafka_2.13-3.1.0/config/log4j.properties
修改 server.properties
/opt/kafka_2.13-3.1.0/config/server.properties
文件最后加上如下两行:
inter.broker.protocol.version=0.10.2
log.message.format.version=0.10.2
尝试启动新版本 Kafka 服务
cd `/opt/kafka_2.13-3.1.0`
bin/kafka-server-start.sh -daemon config/server.properties
tail -f logs/server.log
此时运行一会儿,确保一切正常没有影响业务,如有任何问题,都可以回退。
链接到新 Kafka 目录
rm /opt/kafka
ln -s `/opt/kafka_2.13-3.1.0` /opt/kafka
更新集群中下一台 broker
重复流程一的所有步骤,直到集群中所有 broker 都升级到最新,才能进入下一流程。
如果集群中还有旧 broker 在运行,进入下一阶段,就会出现 broker 无法启动的问题!
流程二
正式启用新版本 Kafka
修改配置文件 /opt/kafka_2.13-3.1.0/config/server.properties
inter.broker.protocol.version=3.1
重启新版本 Kafka
# 先停止 kafka 服务
cd `/opt/kafka`
bin/kafka-server-stop.sh
# supervisor 方式启动
supervisorctl start kafka
此时启用了最新版本,已无法降级。 原因: If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. (https://kafka.apache.org/31/documentation.html#upgrade) 尝试降级会碰到如下报错:
[2022-03-17 17:59:17,129] ERROR Closing socket for 192.168.64.9:9092-192.168.64.11:58490 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 6 and apiVersion: 7
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 6: 7
at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:50)
at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:66)
at org.apache.kafka.common.requests.UpdateMetadataRequest.parse(UpdateMetadataRequest.java:306)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:109)
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:94)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:89)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:513)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:505)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:505)
at kafka.network.Processor.run(SocketServer.scala:433)
at java.lang.Thread.run(Thread.java:748)
更新集群中下一台 broker
重复流程二的所有步骤,直到集群中所有 broker 都重启完成且无报错,才能进入下一流程。
流程三
启用新的消息格式
修改配置文件 /opt/kafka_2.13-3.1.0/config/server.properties
log.message.format.version=3.1
然后再次重启 Kafka 。
更新集群中下一台 broker
重复流程三的所有步骤,直到集群中所有 broker 都重启完成且无报错,即完成升级。