Kafka 客户端选择(Go 相关)

2022/09/04

旧项目中主要使用的是 confluent-kafka-go ,它是由 Kafka 背后的技术公司 confluent.inc 维护的 Go 客户端,但是其最主要的问题是,它基于 kafka c/c++ 库 librdkafka 构建而成,这就意味着要是用它就必须使用 cgo 。容器镜像的构建也会相对繁琐。

几款主流的 Go kafka 客户端:

准备

使用 docker compose 搭建一个本地 kafka 实验环境。

version: "3.9"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    restart: unless-stopped

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    restart: unless-stopped
# 启动 compose 环境
docker compose up -d

# 使用 kafka 自带的生产者工具,向 test 主题发消息
docker compose exec kafka \
  kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic test

# 使用 kafka 自带的消费者工具,订阅 test 主题
docker compose exec kafka \
  kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic test \
  --group 1 \
  --from-beginning

sarama

目前该库在社区里 star 数量最多。

Producer

先介绍最基本的例子。

SyncProducer

_func Example_producerSyncSarama_() {
   _var _(
      brokers = []string{"127.0.0.1:29092"}
      topic   = "test"
   )

   _// setup config_
_   _config := sarama.NewConfig()
   config.Producer.Return.Successes = true
   producer, err := sarama.NewSyncProducer(brokers, config)
   _if _err != _nil _{
      log.Fatalf("new producer: %v", err)
   }

   _// send messages_
_   for _i := 0; i < 10; i++ {
      s := fmt.Sprint("msg", i)
      msg := &sarama.ProducerMessage{
         Topic: topic,
         Value: sarama.StringEncoder(s),
      }

      p, o, err := producer.SendMessage(msg)
      _if _err != _nil _{
         log.Printf("ERR: send partation %d offset %d: %v", p, o, err)
      }
   }

   _// shut down producer_
_   if _err := producer.Close(); err != _nil _{
      log.Printf("ERR: close producer: %v", err)
   }
}

需要注意的是,在 SyncProducer 中 Producer.Return.Successes 需要显式配置为 true 。

通过调用 producer 的 SendMessage 方法,将封装好的 sarama.ProducerMessage 发出,因为是同步请求,如果遇到发送错误会直接返回。

如果要提高吞吐量,让发送不被阻塞,我们可以使用 AsyncProducer 。

AsyncProducer

_func Example_producerAsyncSarama_() {
   _var _(
      brokers = []string{"localhost:29092"}
      topic   = "test"
   )

   _// setup config_
_   _config := sarama.NewConfig()
   producer, err := sarama.NewAsyncProducer(brokers, config)
   _if _err != _nil _{
      log.Fatalf("new producer: %v", err)
   }

   _// handle producer errors in another goroutine_
_   go func_() {
      _for _err := _range _producer.Errors() {
         log.Printf("ERR: produce: %v", err)
      }
   }()

   _// send messages_
_   for _i := 0; i < 10; i++ {
      s := fmt.Sprint("msg", i)
      msg := &sarama.ProducerMessage{
         Topic: topic,
         Value: sarama.StringEncoder(s),
      }
      producer.Input() <- msg
   }

   _// shut down producer, it will flush all pending messages_
_   if _err := producer.Close(); err != _nil _{
      log.Printf("ERR: close producer: %v", err)
   }
}

异步发送的消息,报错信息可以通过 producer.Errors 方法读取到,它的返回值是一个 unbuffered channel 。

需要注意的是初始化的配置对象中,Producer.Return.Errors 默认值是 true ,这意味着我们必须读取 producer.Errors 来处理错误,否则会导致发送消息阻塞。

Consumer

sarama 的消费者客户端使用起来稍微有一点点繁琐,需要用户自己实现 sarama.ConsumerGroupHandler 接口。

我们可以参考官方提供的例子,得到一个最精简的实现:

_type _consumeHandler _struct_{}

_func _(consumeHandler) _Setup_(_ sarama.ConsumerGroupSession) error   { _return nil _}
_func _(consumeHandler) _Cleanup_(_ sarama.ConsumerGroupSession) error { _return nil _}
_func _(h consumeHandler) _ConsumeClaim_(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   _for _msg := _range _claim.Messages() {
      log.Printf("msg:%s topic:%q partition:%d offset:%d",
         msg.Value, msg.Topic, msg.Partition, msg.Offset)
      sess.MarkMessage(msg, "")
   }
   _return nil_
}

ConsumerGroup

_func Example_consumerSarama_() {
   _var _(
      brokers = []string{"127.0.0.1:29092"}
      topics  = []string{"test"}
      group   = "sarama"
   )

   config := sarama.NewConfig()
   consumerGroup, err := sarama.NewConsumerGroup(brokers, group, config)
   _if _err != _nil _{
      log.Fatalf("new consumer: %v", err)
   }
   _defer func_() { _ = consumerGroup.Close() }()

   ctx := context.Background()
   _for _{
      handler := consumeHandler{}

      _// `Consume` should be called inside an infinite loop, when a_
_      // server-side rebalance happens, the consumer session will need to be_
_      // recreated to get the new claims_
_      _err := consumerGroup.Consume(ctx, topics, handler)
      _if _err != _nil _{
         log.Fatalf("consume: %v", err)
      }
   }
}

需要注意的是,consumerGroup.Consume 方法需要在循环中调用,这考虑到服务端发起 rebalance 的时候,消费者客户端需要重建会话。

其他

sarama 提供了 mock 测试包,可以作为依赖 sarama 的 go 包的自测。

ckafka

该库由 kafka 背后的技术公司 confluent.inc 维护。它基于 librdkafka 构建,所以如果要跨平台编译是个问题,除此之外之外使用还是挺简单地。

Producer

_func Example_producerCKafka_() {
   _var _(
      brokers = []string{"127.0.0.1:29092"}
      topic   = "test"
   )

   _// setup config_
_   _config := &ckafka.ConfigMap{
      "bootstrap.servers": strings.Join(brokers, ","),
   }
   producer, err := ckafka.NewProducer(config)
   _if _err != _nil _{
      log.Fatalf("new producer: %v", err)
   }

   _// handle producer errors in another goroutine_
_   go func_() {
      _for _e := _range _producer.Events() {
         _switch _ev := e.(_type_) {
         _case _*ckafka.Message:
            _if _err := ev.TopicPartition.Error; err != _nil _{
               log.Printf("ERR: produce: %v", err)
            }
         _default_:
            log.Printf("DBG: ignore: %v", e)
            _continue_
_         _}
      }
   }()

   _// send messages_
_   for _i := 0; i < 10; i++ {
      s := fmt.Sprint("msg", i)
      msg := &ckafka.Message{
         TopicPartition: ckafka.TopicPartition{
            Topic: &topic,
         },
         Value: []byte(s),
      }

      _if _err := producer.Produce(msg, _nil_); err != _nil _{
         log.Printf("ERR: produce: %v", err)
      }
   }

   _// shut down producer_
_   _producer.Close()
}

需要注意的是 ckafka.ConfigMap 的底层数据类型是 map[string]interface{} 需要查阅文档,不要填错了值的数据类型。

默认提供的 producer.Produce 方法就是异步的,所以发送之后的返回结果也需要异步监听事件。

Consumer

_func Example_consumerCKafka_() {
   _var _(
      brokers = []string{"127.0.0.1:29092"}
      topics  = []string{"test"}
      group   = "ckafka"
   )

   config := &ckafka.ConfigMap{
      "bootstrap.servers": strings.Join(brokers, ","),
      "group.id":          group,
   }
   consumer, err := ckafka.NewConsumer(config)
   _if _err != _nil _{
      log.Fatalf("new consumer: %v", err)
   }
   _defer func_() { _ = consumer.Close() }()

   _if _err := consumer.SubscribeTopics(topics, _nil_); err != _nil _{
      log.Fatalf("subscribe topics: %v", err)
   }

   _for _{
      msg, err := consumer.ReadMessage(-1)
      _if _err != _nil _{
         _if _e, ok := err.(ckafka.Error); ok {
            log.Printf("ERR: consume: %v", e)
         }
         _continue_
_      _}

      log.Printf("msg:%s topic:%q partition:%d offset:%d",
         msg.Value,
         *msg.TopicPartition.Topic,
         msg.TopicPartition.Partition,
         msg.TopicPartition.Offset,
      )
   }
}

需要注意的是,consumer.ReadMessage 方法可以指定超时时间,设置成 -1 时,就是始终等待。

kafkago

目前社区中很活跃的 Kafka Go 客户端。

Producer

_func Example_producerKafkaGo_() {
   _var _(
      brokers = []string{"127.0.0.1:29092"}
      topic   = "test"
   )

   _// setup config_
_   _producer := kafkago.Writer{
      Addr:  kafkago.TCP(brokers...),
      Async: true,
   }

   _// send messages_
_   _ctx := context.Background()
   _for _i := 0; i < 10; i++ {
      s := fmt.Sprint("msg", i)
      msg := kafkago.Message{
         Topic: topic,
         Value: []byte(s),
      }

      _if _err := producer.WriteMessages(ctx, msg); err != _nil _{
         log.Printf("ERR: produce: %v", err)
      }
   }

   _// shut down producer_
_   if _err := producer.Close(); err != _nil _{
      log.Printf("ERR: close producer: %v", err)
   }
}

当前例子里是异步生产者。

kafkago 也支持初始化为同步生产者,只需要将 kafkago.Writer 中的 Async 设置为 false 即可,由此可见默认配置下 kafkago 会是同步生产者。

kafkago.Writer 中的 Async 设置为 true 时,producer.WriteMessages 方法将不会阻塞,但是与 sarama 和 ckafka 不同的是,kafkago 的异步生产者对于发送失败的错误,没有好的处理办法。

官方也建议,仅在你不介意消息是否确保投递到 kafka 的场景才开启 Async 。

那么如何提高同步生产者的吞吐量呢?那就要开多个 goroutine 去调用 producer.WriteMessages 方法。

Consumer

_func Example_consumerKafkaGo_() {
   _var _(
      brokers = []string{"127.0.0.1:29092"}
      topics  = []string{"test"}
      group   = "kafkago"
   )

   config := kafkago.ReaderConfig{
      Brokers:     brokers,
      GroupID:     group,
      GroupTopics: topics,
      
      CommitInterval: 2 * time._Second_,
   }
   consumer := kafkago.NewReader(config)
   _defer func_() { _ = consumer.Close() }()

   ctx := context.Background()
   _for _{
      msg, err := consumer.ReadMessage(ctx)
      _if _err != _nil _{
         log.Printf("ERR: consume: %v", err)
         _continue_
_      _}

      log.Printf("msg:%s topic:%q partition:%d offset:%d",
         msg.Value,
         msg.Topic,
         msg.Partition,
         msg.Offset,
      )
   }
}

kafkago 作为消费者客户端,其 API 非常简洁易用。

需要注意的是,当 kafkago.ReaderConfig 中 CommitInterval 不指定时,将采用同步 commit 的方式。其消费者的性能也会大打折扣。

配置参数

上面介绍了三种客户端库的基本使用。

但是直接把 demo 的配置直接用于生产环境肯定是不可取的。有些参数是需要我们根据实际情况进行调整的。

生产者配置

bootstrap.servers

即初始化时连接的 broker 地址列表。

并不推荐把集群里所有的 broker 都填进去,因为从任何一台 broker 都可以获取到整个集群的元数据,一般填写 3~4 台 broker 即可。

客户端启动时,会跟每一个配置里的 broker 都建立 TCP 连接。

批量请求

要提高生产者端的发送效率,批量发送一组消息是必不可少的(无论是否为异步生产者)

单次请求中的消息数

sarama
Producer.Flush.MaxMessages
ckafka
max.in.flight.requests.per.connection
kafkago
BatchSize

单次请求中的消息大小

sarama
Producer.Flush.Bytes
ckafka
batch.size
kafkago
BatchBytes

单次批请求的间隔时间

sarama
Producer.Flush.Frequency
ckafka
linger.ms
kafkago
BatchTimeout

RequiredAcks

这个表示消息发给 broker 之后要达到什么条件才算发送成功。

以 sarama 为例:

消费者配置

着重介绍 消费者组 的配置,下面以 ckafka 的配置项为例,其他客户端也提供了对应的配置项。

bootstrap.servers

同上

group.id

众所周知,同一个消费者组的消费者们是共享消费的。

不同的应用服务/业务 应该使用不同的消费组名称。

auto.commit.interval.ms

消费者每消费一条消息,就需要更新消费偏移量,异步提交消费偏移量可以提高消费效率。

auto.offset.reset

在消费者组创建的时候,决定从最新的 offset 还是最早的 offset 开始消费。

fetch.wait.max.ms

消费者提高吞吐量也是攒一定量消息一起处理,所以需要配置等待处理批量消息的最大等待时间。

fetch.min.bytes

代表 consumer 接受的最小批量消息的大小,一般就给 1 字节就好了。

fetch.max.bytes

代表 consumer 接受的最大批量消息的大小,确保要比你的单条消息更大。

Benchmark

我写了一个基准测试,分别对比了这三个 kafka 客户端库作为生产者和消费者时的效率。

https://github.com/sko00o/benchmark-kafka-go-clients

建议

从 API 的易用性上,我很推荐使用 segmentio/kafka-go ,从基准测试来看,其作为消费者客户端的性能完全够用,开启异步消费时性能最强。

但是用作生产者,segmentio/kafka-go 在内存管理上有一些问题,我推荐使用 sarama,虽然它的 API 有一些复杂(或者说功能全面),但是考虑到它作为生产者客户端的性能及稳定性,以及它没有 cgo 依赖。

参考