旧项目中主要使用的是 confluent-kafka-go ,它是由 Kafka 背后的技术公司 confluent.inc 维护的 Go 客户端,但是其最主要的问题是,它基于 kafka c/c++ 库 librdkafka 构建而成,这就意味着要是用它就必须使用 cgo 。容器镜像的构建也会相对繁琐。
几款主流的 Go kafka 客户端:
- Shopify/sarama (后文简称 sarama)
- confluentinc/confluent-kafka-go(后文简称 ckafka)
- segmentio/kafka-go (后文简称 kafkago)
准备
使用 docker compose 搭建一个本地 kafka 实验环境。
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: unless-stopped
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
restart: unless-stopped
# 启动 compose 环境
docker compose up -d
# 使用 kafka 自带的生产者工具,向 test 主题发消息
docker compose exec kafka \
kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic test
# 使用 kafka 自带的消费者工具,订阅 test 主题
docker compose exec kafka \
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic test \
--group 1 \
--from-beginning
sarama
目前该库在社区里 star 数量最多。
Producer
先介绍最基本的例子。
SyncProducer
_func Example_producerSyncSarama_() {
_var _(
brokers = []string{"127.0.0.1:29092"}
topic = "test"
)
_// setup config_
_ _config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
_if _err != _nil _{
log.Fatalf("new producer: %v", err)
}
_// send messages_
_ for _i := 0; i < 10; i++ {
s := fmt.Sprint("msg", i)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(s),
}
p, o, err := producer.SendMessage(msg)
_if _err != _nil _{
log.Printf("ERR: send partation %d offset %d: %v", p, o, err)
}
}
_// shut down producer_
_ if _err := producer.Close(); err != _nil _{
log.Printf("ERR: close producer: %v", err)
}
}
需要注意的是,在 SyncProducer 中 Producer.Return.Successes 需要显式配置为 true 。
通过调用 producer 的 SendMessage 方法,将封装好的 sarama.ProducerMessage 发出,因为是同步请求,如果遇到发送错误会直接返回。
如果要提高吞吐量,让发送不被阻塞,我们可以使用 AsyncProducer 。
AsyncProducer
_func Example_producerAsyncSarama_() {
_var _(
brokers = []string{"localhost:29092"}
topic = "test"
)
_// setup config_
_ _config := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer(brokers, config)
_if _err != _nil _{
log.Fatalf("new producer: %v", err)
}
_// handle producer errors in another goroutine_
_ go func_() {
_for _err := _range _producer.Errors() {
log.Printf("ERR: produce: %v", err)
}
}()
_// send messages_
_ for _i := 0; i < 10; i++ {
s := fmt.Sprint("msg", i)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(s),
}
producer.Input() <- msg
}
_// shut down producer, it will flush all pending messages_
_ if _err := producer.Close(); err != _nil _{
log.Printf("ERR: close producer: %v", err)
}
}
异步发送的消息,报错信息可以通过 producer.Errors 方法读取到,它的返回值是一个 unbuffered channel 。
需要注意的是初始化的配置对象中,Producer.Return.Errors 默认值是 true ,这意味着我们必须读取 producer.Errors 来处理错误,否则会导致发送消息阻塞。
Consumer
sarama 的消费者客户端使用起来稍微有一点点繁琐,需要用户自己实现 sarama.ConsumerGroupHandler 接口。
我们可以参考官方提供的例子,得到一个最精简的实现:
_type _consumeHandler _struct_{}
_func _(consumeHandler) _Setup_(_ sarama.ConsumerGroupSession) error { _return nil _}
_func _(consumeHandler) _Cleanup_(_ sarama.ConsumerGroupSession) error { _return nil _}
_func _(h consumeHandler) _ConsumeClaim_(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
_for _msg := _range _claim.Messages() {
log.Printf("msg:%s topic:%q partition:%d offset:%d",
msg.Value, msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
_return nil_
}
ConsumerGroup
_func Example_consumerSarama_() {
_var _(
brokers = []string{"127.0.0.1:29092"}
topics = []string{"test"}
group = "sarama"
)
config := sarama.NewConfig()
consumerGroup, err := sarama.NewConsumerGroup(brokers, group, config)
_if _err != _nil _{
log.Fatalf("new consumer: %v", err)
}
_defer func_() { _ = consumerGroup.Close() }()
ctx := context.Background()
_for _{
handler := consumeHandler{}
_// `Consume` should be called inside an infinite loop, when a_
_ // server-side rebalance happens, the consumer session will need to be_
_ // recreated to get the new claims_
_ _err := consumerGroup.Consume(ctx, topics, handler)
_if _err != _nil _{
log.Fatalf("consume: %v", err)
}
}
}
需要注意的是,consumerGroup.Consume 方法需要在循环中调用,这考虑到服务端发起 rebalance 的时候,消费者客户端需要重建会话。
其他
sarama 提供了 mock 测试包,可以作为依赖 sarama 的 go 包的自测。
ckafka
该库由 kafka 背后的技术公司 confluent.inc 维护。它基于 librdkafka 构建,所以如果要跨平台编译是个问题,除此之外之外使用还是挺简单地。
Producer
_func Example_producerCKafka_() {
_var _(
brokers = []string{"127.0.0.1:29092"}
topic = "test"
)
_// setup config_
_ _config := &ckafka.ConfigMap{
"bootstrap.servers": strings.Join(brokers, ","),
}
producer, err := ckafka.NewProducer(config)
_if _err != _nil _{
log.Fatalf("new producer: %v", err)
}
_// handle producer errors in another goroutine_
_ go func_() {
_for _e := _range _producer.Events() {
_switch _ev := e.(_type_) {
_case _*ckafka.Message:
_if _err := ev.TopicPartition.Error; err != _nil _{
log.Printf("ERR: produce: %v", err)
}
_default_:
log.Printf("DBG: ignore: %v", e)
_continue_
_ _}
}
}()
_// send messages_
_ for _i := 0; i < 10; i++ {
s := fmt.Sprint("msg", i)
msg := &ckafka.Message{
TopicPartition: ckafka.TopicPartition{
Topic: &topic,
},
Value: []byte(s),
}
_if _err := producer.Produce(msg, _nil_); err != _nil _{
log.Printf("ERR: produce: %v", err)
}
}
_// shut down producer_
_ _producer.Close()
}
需要注意的是 ckafka.ConfigMap 的底层数据类型是 map[string]interface{} 需要查阅文档,不要填错了值的数据类型。
默认提供的 producer.Produce 方法就是异步的,所以发送之后的返回结果也需要异步监听事件。
Consumer
_func Example_consumerCKafka_() {
_var _(
brokers = []string{"127.0.0.1:29092"}
topics = []string{"test"}
group = "ckafka"
)
config := &ckafka.ConfigMap{
"bootstrap.servers": strings.Join(brokers, ","),
"group.id": group,
}
consumer, err := ckafka.NewConsumer(config)
_if _err != _nil _{
log.Fatalf("new consumer: %v", err)
}
_defer func_() { _ = consumer.Close() }()
_if _err := consumer.SubscribeTopics(topics, _nil_); err != _nil _{
log.Fatalf("subscribe topics: %v", err)
}
_for _{
msg, err := consumer.ReadMessage(-1)
_if _err != _nil _{
_if _e, ok := err.(ckafka.Error); ok {
log.Printf("ERR: consume: %v", e)
}
_continue_
_ _}
log.Printf("msg:%s topic:%q partition:%d offset:%d",
msg.Value,
*msg.TopicPartition.Topic,
msg.TopicPartition.Partition,
msg.TopicPartition.Offset,
)
}
}
需要注意的是,consumer.ReadMessage 方法可以指定超时时间,设置成 -1 时,就是始终等待。
kafkago
目前社区中很活跃的 Kafka Go 客户端。
Producer
_func Example_producerKafkaGo_() {
_var _(
brokers = []string{"127.0.0.1:29092"}
topic = "test"
)
_// setup config_
_ _producer := kafkago.Writer{
Addr: kafkago.TCP(brokers...),
Async: true,
}
_// send messages_
_ _ctx := context.Background()
_for _i := 0; i < 10; i++ {
s := fmt.Sprint("msg", i)
msg := kafkago.Message{
Topic: topic,
Value: []byte(s),
}
_if _err := producer.WriteMessages(ctx, msg); err != _nil _{
log.Printf("ERR: produce: %v", err)
}
}
_// shut down producer_
_ if _err := producer.Close(); err != _nil _{
log.Printf("ERR: close producer: %v", err)
}
}
当前例子里是异步生产者。
kafkago 也支持初始化为同步生产者,只需要将 kafkago.Writer 中的 Async 设置为 false 即可,由此可见默认配置下 kafkago 会是同步生产者。
kafkago.Writer 中的 Async 设置为 true 时,producer.WriteMessages 方法将不会阻塞,但是与 sarama 和 ckafka 不同的是,kafkago 的异步生产者对于发送失败的错误,没有好的处理办法。
官方也建议,仅在你不介意消息是否确保投递到 kafka 的场景才开启 Async 。
那么如何提高同步生产者的吞吐量呢?那就要开多个 goroutine 去调用 producer.WriteMessages 方法。
Consumer
_func Example_consumerKafkaGo_() {
_var _(
brokers = []string{"127.0.0.1:29092"}
topics = []string{"test"}
group = "kafkago"
)
config := kafkago.ReaderConfig{
Brokers: brokers,
GroupID: group,
GroupTopics: topics,
CommitInterval: 2 * time._Second_,
}
consumer := kafkago.NewReader(config)
_defer func_() { _ = consumer.Close() }()
ctx := context.Background()
_for _{
msg, err := consumer.ReadMessage(ctx)
_if _err != _nil _{
log.Printf("ERR: consume: %v", err)
_continue_
_ _}
log.Printf("msg:%s topic:%q partition:%d offset:%d",
msg.Value,
msg.Topic,
msg.Partition,
msg.Offset,
)
}
}
kafkago 作为消费者客户端,其 API 非常简洁易用。
需要注意的是,当 kafkago.ReaderConfig 中 CommitInterval 不指定时,将采用同步 commit 的方式。其消费者的性能也会大打折扣。
配置参数
上面介绍了三种客户端库的基本使用。
但是直接把 demo 的配置直接用于生产环境肯定是不可取的。有些参数是需要我们根据实际情况进行调整的。
生产者配置
bootstrap.servers
即初始化时连接的 broker 地址列表。
并不推荐把集群里所有的 broker 都填进去,因为从任何一台 broker 都可以获取到整个集群的元数据,一般填写 3~4 台 broker 即可。
客户端启动时,会跟每一个配置里的 broker 都建立 TCP 连接。
批量请求
要提高生产者端的发送效率,批量发送一组消息是必不可少的(无论是否为异步生产者)
单次请求中的消息数
sarama | Producer.Flush.MaxMessages |
ckafka | max.in.flight.requests.per.connection |
kafkago | BatchSize |
单次请求中的消息大小
sarama | Producer.Flush.Bytes |
ckafka | batch.size |
kafkago | BatchBytes |
单次批请求的间隔时间
sarama | Producer.Flush.Frequency |
ckafka | linger.ms |
kafkago | BatchTimeout |
RequiredAcks
这个表示消息发给 broker 之后要达到什么条件才算发送成功。
以 sarama 为例:
- RequireNone (0) fire-and-forget
- RequireOne (1) wait for the leader to acknowledge the writes
- RequireAll (-1) wait for the full ISR to acknowledge the writes
消费者配置
着重介绍 消费者组 的配置,下面以 ckafka 的配置项为例,其他客户端也提供了对应的配置项。
bootstrap.servers
同上
group.id
众所周知,同一个消费者组的消费者们是共享消费的。
不同的应用服务/业务 应该使用不同的消费组名称。
auto.commit.interval.ms
消费者每消费一条消息,就需要更新消费偏移量,异步提交消费偏移量可以提高消费效率。
auto.offset.reset
在消费者组创建的时候,决定从最新的 offset 还是最早的 offset 开始消费。
fetch.wait.max.ms
消费者提高吞吐量也是攒一定量消息一起处理,所以需要配置等待处理批量消息的最大等待时间。
fetch.min.bytes
代表 consumer 接受的最小批量消息的大小,一般就给 1 字节就好了。
fetch.max.bytes
代表 consumer 接受的最大批量消息的大小,确保要比你的单条消息更大。
Benchmark
我写了一个基准测试,分别对比了这三个 kafka 客户端库作为生产者和消费者时的效率。
https://github.com/sko00o/benchmark-kafka-go-clients
建议
从 API 的易用性上,我很推荐使用 segmentio/kafka-go ,从基准测试来看,其作为消费者客户端的性能完全够用,开启异步消费时性能最强。
但是用作生产者,segmentio/kafka-go 在内存管理上有一些问题,我推荐使用 sarama,虽然它的 API 有一些复杂(或者说功能全面),但是考虑到它作为生产者客户端的性能及稳定性,以及它没有 cgo 依赖。