Yammer Metrics
Kafka uses Yammer Metrics for metrics reporting in the server.
Kafka 服务端采用 Yammer Metrics 报告指标。
在 Github 上搜索你目前可以找到这个项目 dropwizard/metrics ,其 License 中写道:
- 2013 年之前由 Coda Hale 开发维护
- 2014 年交给 Dropwizard 团队维护
我们当前线上 Kafka 版本是 3.1.0 ,但翻下源码发现其依赖的 com.yammer.metrics 包的版本其实还是 2.2.0 。
根据 Metrics 官网的介绍:
Metrics is a Java library which gives you unparalleled insight into what your code does in production.
这个 Java 库,可以让你从生产环境获得程序运行时的非常丰富的指标(metrics)数据。
我们可以使用 6 种指标:
- Gauges: an instantaneous measurement of a discrete value.
- Counters: a value that can be incremented and decremented. Can be used in queues to monitorize the remaining number of pending jobs.
- Meters: measure the rate of events over time. You can specify the rate unit, the scope of events or event type.
- Histograms: measure the statistical distribution of values in a stream of data.
- Timers: measure the amount of time it takes to execute a piece of code and the distribution of its duration.
- Healthy checks: as his name suggests, it centralize our service’s healthy checks of external systems.
Gauges: 离散值的瞬时测量。
Counters: 可以递增和递减的值。可以用于队列中监控剩余的挂起作业数量。
Meters: 测量随时间发生的时间发生率。可以指定费率单位,事件范围或事件类型。
Histograms: 测量数据流中值的统计分布。
Timers:测量执行一段代码所需的时间机器持续时间的分布。
Healthy checks: 顾名思义,服务健康度检查。
JMX
Java Management Extensions (JMX) 是一种技术,可用于对 Java 虚拟机进行监控和管理。Java 虚拟机内置的管理工具是开箱即用的,可以通过 JMX API 监控任何使用这些工具的应用程序。
Kafka 的服务端和其 Java 客户端都可以通过 JMX 导出监控指标,可以配置插件(pluggable stats reporters)来报告统计数据到外部监控系统。
Kafka 默认是将远程 JMX 端口禁用的。你可以通过设置 JMX_PORT
环境变量启用远程 JMX 端口。
另外注意,Kafka 中 JMX 端口的身份验证也是默认禁用的, 对于生产环境部署,可以通过设置 KAFKA_JMX_OPTS
来覆盖其安全配置开启身份验证。
最简单的查看可用指标的方式是用 jconsole 。
演示使用 jconsole 查看 Kafka 监控指标
新建 docker-compose.yml
启动服务
docker compose up -d
使用 jconsole 连接 JMX 端口查看
jconsole localhost:9999
展示图表
所有的指标
Kafka 通过 JMX API 可以获取的指标非常多。
官方文档中有介绍 https://kafka.apache.org/documentation/#remote_jmx
但 Kafka 官方文档中并没有详细记录所有的指标,最全的指标列表还得以 jconsole 中看到的为主。
Kafka Metrics
完整的 Kafka 指标包含三类:
- Kafka 服务端(broker)指标
- 生产端(producer)指标
- 消费端(consumer)指标
当然,Kafka 集群如果依赖 Zookeeper ,那么 Zookeeper 的监控指标也很值得关注。
Broker Metrics
服务端指标也可以分为三类:
- Kafka 排放(Kafka-emitted)的指标
- 主机级别的指标
- JVM GC 指标
下面介绍,每个分类下需要重点关注的指标。
Kafka-emitted Metrics
Name | MBean name | Description | Metric type |
UnderReplicatedPartitions | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | Number of unreplicated partitions | Resource: Availability |
IsrShrinksPerSec/IsrExpandsPerSec | kafka.server:type=ReplicaManager,name=IsrShrinksPerSec kafka.server:type=ReplicaManager,name=IsrExpandsPerSec | Rate at which the pool of in-sync replicas (ISRs) shrinks/expands | Resource: Availability |
ActiveControllerCount | kafka.controller:type=KafkaController,name=ActiveControllerCount | Number of active controllers in cluster | Resource: Error |
OfflinePartitionsCount | kafka.controller:type=KafkaController,name=OfflinePartitionsCount | Number of offline partitions | Resource: Availability |
LeaderElectionRateAndTimeMs | kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs | Leader election rate and latency | Other |
UncleanLeaderElectionsPerSec | kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec | Number of “unclean” elections per second | Resource: Error |
TotalTimeMs | kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower} | Total time (in ms) to serve the specified request (Produce/Fetch) | Work: Performance |
PurgatorySize | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation={Produce|Fetch} | Number of requests waiting in producer purgatory/Number of requests waiting in fetch purgatory | Other |
BytesInPerSec/BytesOutPerSec | kafka.server:type=BrokerTopicMetrics,name={BytesInPerSec|BytesOutPerSec} | Aggregate incoming/outgoing byte rate | Work: Throughput |
RequestsPerSecond | kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower},version={0|1|2|3|…} | Number of (producer|consumer|follower) requests per second | Work: Throughput |
UnderReplicatedPartitions
在健康的 Kafka 集群中,处于同步的副本(ISR)数量应该恰好等于总的副本数量。
如果某分区的副本落后于 leader 太多,这个 follower 分区就会从 ISR 池中被移除。(注:这是我们常说的分区掉出 ISR )那么你可以在 IsrShrinksPerSec
指标中看到相应的增长。
如果一个 broker 变成不可用状态,UnderReplicatedPartitions
指标的值会急剧增长。
缺少副本就无法满足 Kafka 的高可用保证,如果这个指标长时间大于 0 ,就需要排查问题了。
IsrShrinksPerSec/IsrExpandsPerSec
特定分区的同步副本(ISR)数量应保持相对静态,除非你在集群中扩充新的 broker 或者删除分区。
为了维持高可用,一个健康的 Kafka 集群需要最低数量的 ISR 用于故障转移。
如果副本有一段时间(通过 replica.socket.timeout.ms
可配置这个时间长度)没有连上 leader ,可能被从 ISR 池中删除。任何在 IsrShrinksPerSec
增加时,短时间内没有 IsrExpandsPerSec
相应的增加,你需要调查这些指标的变化。
ActiveControllerCount
Kafka 集群中第一个启动的节点会自动成为 controller ,而且仅有一个。 controller 负责维护分区 leader 列表,并在分区 leader 不可用的情况下协调 leader 过渡。
如果有必要更换 leader , zookeeper 会从 broker 池中随机选择一个新的 controller 。
所以 ActiveControllerCount
的总和应该始终为 1 ,任何不为 1 的值持续时间超过一秒都应该发出告警。
OfflinePartitionsCount (仅限 controller )
该指标报告了没有活动的 leader 分区的数量。
因为所有的读写操作都发生在分区 leader 上,所以该指标为非 0 值时应该发出警报,以防服务中断。
任何没有活动 leader 的分区都是完全无法访问的,在 leader 变得可用之前,该分区的消费者和生产者都会被阻塞。
LeaderElectionRateAndTimeMs
当分区 leader 挂掉时,会触发新 leader 的选举。
如果分区 leader 无法维持和 zookeeper 的会话,则该分区 leader 被视为“死亡”。与 zookeeper 的 Zab 不同, Kafka 不采用多数共识算法进行 leader 选举。
相反, Kafka 的法定人数(quorum)由特定分区的所有同步副本(ISR)的集合组成。如果 follower 副本追赶上了 leader ,则被认为是同步的,这意味着 ISR 中任何副本都可以提升为 leader 。
LeaderElectionRateAndTimeMs
报告了 leader 选举的速率(每秒)以及集群缺失 leader 的总时间(毫秒为单位)。
虽然这个指标不如 UncleanLeaderElectionsPerSec
报告的指标严重,你还是需要关注它。
如上所述,让与当前 leader 失去联系,就会触发 leader 选举,这也有可能选到一个离线的 broker 。
UncleanLeaderElectionsPerSec
当 Kafka 集群中没有合格的分区 leader 时,就会发生“脏选举”(unclean leader election)。
通常,作为分区 leader 的 broker 离线时,将从 ISR 集合中选举一个新的 leader 。在 Kafka 0.11 及更高版本中,“脏选举”是默认禁用的。这意味着如果分区没有任何 ISR 可以选举为新 leader 时,该分区将处于脱机状态。
如果 Kafka 配置为允许“脏选举”,则会从不同步的副本中选择一个 leader ,这样做的副作用是前 leader 丢失之前未同步的消息都会丢失。
本质上,“脏选举”是为了可用性牺牲了一致性。
需要为该指标发出警报,因为它表示数据丢失。
TotalTimeMs
TotalTimeMs
指标系列测量服务请求所需的总时间,这些服务请求包括:
- produce: 生产者发送数据的请求。
- fetch-consumer:消费者请求获取新数据。
- fetch-follower: 分区 follower 请求获取新数据。
TotalTimeMs
这个指标本身是下列四项指标的总和:
- queue: 请求队列中等待所花费的时间。
- local: leader 处理所花费的时间。
- remote: 等待 follower 响应所花费的时间。(仅当
request.required.acks=-1
) - response: 发送响应的时间。
通常情况下,这些值波动不大。如果发现异常,可以逐个检查速度减慢的确切请求段。
PurgatorySize
purgatory 译作炼狱。这里可以理解成为了等待满足请求条件的暂存区。每种类型的请求都有自己的参数来确定是否将其加入 purgatory :
- fetch: 如果没有足够的数据满足请求(消费端上的
fetch.min.bytes
),则把 fetch 请求加入 purgatory,直到达到fetch.wait.max.ms
指定的时间或有足够的数据。 - produce: 如果
request.requied.acks=-1
,所有的 produce 请求都将进入 purgatory ,直到分区 leader 收到所有 follower 的 ack 。
密切关注 purgatory 的大小有助于确定延迟的根本原因。例如,如果 purgatory 中的 fetch 请求数量增加,则可以解释消费端 fetch 请求时间的增加。
BytesInPerSec/BytesOutPerSec
一般来说,磁盘吞吐量往往是 Kafka 性能上的瓶颈。然而,并不是说网络不会成为瓶颈。
如果你需要跨数据中心发送消息,如果你的 topic 有大量消费者,或者有副本正在追赶其 leader ,网络吞吐量可能会影响 Kafka 的性能。
跟踪 broker 上的网络吞吐量有助于发现潜在瓶颈,并可以为决策提供信息,比如是否启用消息的端到端压缩。
RequestsPerSec
你应该监控来自生产者、消费者和 followers 的请求速率,以确保你的 Kafka 部署能够有效地进行通信。
随着生产者发送更多流量或者部署规模扩大,添加更多消费者或者 followers ,可以预见 Kafka 的请求率会上升。
如果 RequestsPerSec
仍然很高,就需要考虑增加生产者、消费者以及 broker 的批大小(batch size)。
通过减少请求数量,也就是减少不要的网络开销,可以提高 Kafka 集群的吞吐量。
Host-level broker metrics
Name | Description | Metric type |
Page cache reads ratio | Ratio of reads from page cache vs reads from disk | Resource: Saturation |
Disk usage | Disk space currently consumed vs. available | Resource: Utilization |
CPU usage | CPU use | Resource: Utilization |
Network bytes sent/received | Network traffic in/out | Resource: Utilization |
Page cache read ratio
Kafka 一开始就被设计成利用内核的页面缓存,来提供可靠和高性能的消息管道。
页面缓存读取率类似于数据库中的缓存命中率 —— 值越高,相当于更快的读取,也就是更好的性能。
如果副本正在追赶 leader (比如产生新的 broker 时),则该指标会短暂下降。但如果页面缓存读取率一直在 80% 以下,则可能需要添加 broker 来提高性能。
Disk usage
Kafka 将数据持久化到磁盘,所以有必要监控磁盘使用量。
CPU usage
Kafka 的 CPU 使用率一般不高,即使启用了 GZIP 压缩, CPU 也很少是性能问题的根源。
所以看到 CPU 利用率出现峰值,则需要调查。
Network bytes sent/received
如果 Kafka broker 的主机上还托管了其他网络服务时,你需要监控主机级别的网络吞吐量
高网络使用率可能是性能下降的症状。如果你发现网络使用率高,关联 TCP 重传和丢包错误可以帮助确定性能问题是否与网络相关。
JVM garbage collection metrics
Kafka 运行在 Java 虚拟机(JVM)中。垃圾回收会带来很高性能成本。
由垃圾回收而导致的长时间暂停,可能导致 Zookeeper 会话因为超时而废弃。
如果你看到 GC 过程中更有过多暂停,可以考虑升级 JDK 版本或者垃圾收集器,或者延长
zookeeper.session.timeout.ms
。另外还可以调整 Java runtime 减少垃圾收集。
JVM GC 采用分代回收(Generational GC),堆(Heap)被分成新生代(Young Generation)和老生代(Old/Tenured Generation)还有永久代(Permanent Generation)。
Name | MBean name | Description | Metric type |
CollectionCount | java.lang:type=GarbageCollector,name=G1 (Young|Old) Generation | The total count of young or old garbage collection processes executed by the JVM | Other |
CollectionTime | java.lang:type=GarbageCollector,name=G1 (Young|Old) Generation | The total amount of time (in milliseconds) the JVM has spent executing young or old garbage collection processes | Other |
Young generation garbage collection time
新生代垃圾收集发生得相对频繁,而且是 stop-the-world 垃圾收集,这意味着所有应用程序线程在执行时都会暂停。这个指标的任何显著增加都极大影响 Kafka 的性能。
Old generation garbage collection count/time
老生代垃圾收集释放老生代堆中未使用的内存。这是低暂停(low-pause)垃圾回收。
如果此过程需要几秒钟才能完成,或者发生频率增加,则表示集群的内存不够。
jmx_exporter + prometheus + grafana
Promethrus 基本上是 Open Monitoring 的最热门方案。
利用 jmx_exporter 代理 JMX 接口就可以为 prometheus 提供指标数据。
jmx_exporter 的文档介绍了,从 MBeans 到 pattern input 和 metric name 的默认转换规则:
## pattern input:
domain<beanpropertyName1=beanPropertyValue1, beanpropertyName2=beanPropertyValue2, ...><key1, key2, ...>attrName: value
## default name:
domain_beanPropertyValue1_key1_key2_...keyN_attrName{beanpropertyName2="beanPropertyValue2", ...}: value
当然大部分测量指标指标你需要指定 metric 类型,需要配置 rules ,官方仓库中提供的例子也够用了
https://github.com/prometheus/jmx_exporter/blob/main/example_configs/kafka-2_0_0.yml
当监控数据存入 prometheus ,你可以直接访问 prometheus 的 web 端口查看以及配置报警规则,也可以对接其他监控系统。
但是我们一般只是把 prometheus 作为时间数据库用,实际绘制 dashboard 和配置报警用到 grafana 。
参考
- https://kafka.apache.org/documentation/#monitoring
- https://metrics.dropwizard.io/
- https://www.javacodegeeks.com/2012/12/yammer-metrics-a-new-way-to-monitor-your-application.html
- https://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html
- https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics
- https://www.oracle.com/webfolder/technetwork/tutorials/obe/java/gc01/index.html