炎龙智能炎龙智能
AIOPS智能运维平台
AIOPS智能运维平台
  • 消息队列监控

消息队列监控

消息队列监控模块提供对各类消息中间件的全面监控能力,支持Kafka、RabbitMQ、RocketMQ、ActiveMQ等主流消息队列系统,帮助用户实时掌握消息队列的运行状态、性能指标和异常情况,保障消息系统的稳定运行和消息的可靠传递。

1. 功能概述

消息队列监控模块提供以下核心功能:

  • 多类型消息队列支持:支持Kafka、RabbitMQ、RocketMQ、ActiveMQ等主流消息队列系统
  • 实时性能监控:实时监控消息队列的吞吐量、延迟、队列长度等关键性能指标
  • 资源使用监控:监控消息队列服务器的CPU、内存、磁盘I/O等资源使用情况
  • 消息积压监控:监控消息队列的积压情况,及时发现处理瓶颈
  • 消费者组监控:监控消费者组的消费状态、消费延迟等情况
  • 连接与会话监控:监控客户端连接数、会话状态等信息
  • 异常告警:针对队列异常、消息丢失、连接中断等情况设置告警规则
  • 历史数据趋势分析:提供长期性能趋势分析,支持容量规划

2. 支持的消息队列类型

消息队列类型版本支持监控重点告警关键指标
Kafka0.10+吞吐量、延迟、分区状态、消费者组偏移量积压消息数、消费延迟、磁盘使用率
RabbitMQ3.6+队列长度、消息率、连接数、通道数未确认消息数、队列深度、内存使用率
RocketMQ4.0+吞吐量、延迟、消费者进度、消息积累量消息积压、消费延迟、队列状态
ActiveMQ5.10+队列深度、连接数、主题订阅数、消息率队列积压、连接断开、内存使用率

3. Kafka监控

3.1 关键监控指标

3.1.1 主题与分区指标

指标名称单位描述告警阈值建议
主题数量个Kafka集群中的主题总数监控趋势
分区数量个所有主题的分区总数监控趋势
副本数量个所有分区的副本总数监控趋势
ISR收缩数次/分钟ISR集合收缩次数>0
离线分区数个处于离线状态的分区数量>0
未同步分区数个未同步到所有副本的分区数量>0

3.1.2 生产者指标

指标名称单位描述告警阈值建议
发送消息速率条/秒生产者每秒发送的消息数量监控趋势
发送字节速率KB/秒生产者每秒发送的字节数监控趋势
消息大小KB平均消息大小监控趋势
请求延迟ms生产者请求的平均延迟时间>100ms
请求成功率%生产者请求的成功率<99.9%
重试次数次/秒生产者每秒重试的次数>10次/秒

3.1.3 消费者指标

指标名称单位描述告警阈值建议
消费消息速率条/秒消费者每秒消费的消息数量监控趋势
消费字节速率KB/秒消费者每秒消费的字节数监控趋势
消息延迟ms消息从生产到消费的平均延迟时间>1000ms
消费者组数量个Kafka集群中的消费者组总数监控趋势
活跃消费者数个消费者组中的活跃消费者数量监控趋势
消费者滞后量条消费者组的积压消息数量>10000条
消费者滞后时间秒消费者组的积压时间>300秒

3.1.4 服务器指标

指标名称单位描述告警阈值建议
Broker在线状态状态Broker是否在线非在线
CPU使用率%Broker CPU使用率>85%
内存使用率%Broker内存使用率>85%
磁盘使用率%日志分区磁盘使用率>85%
磁盘I/OMB/秒磁盘读写速率监控趋势
网络I/OMB/秒网络读写速率监控趋势
JVM堆使用率%JVM堆内存使用率>80%

3.1.5 控制器与ZooKeeper指标

指标名称单位描述告警阈值建议
控制器选举次数次/小时控制器选举发生的次数>0
ZooKeeper连接状态状态与ZooKeeper的连接状态非连接
ZooKeeper会话超时次/小时ZooKeeper会话超时次数>0

4. RabbitMQ监控

4.1 关键监控指标

4.1.1 队列指标

指标名称单位描述告警阈值建议
队列总数个RabbitMQ中的队列总数监控趋势
就绪消息数条队列中等待消费的消息数>1000条
未确认消息数条已投递但未确认的消息数>1000条
总消息数条队列中的总消息数监控趋势
消息入队速率条/秒每秒进入队列的消息数监控趋势
消息出队速率条/秒每秒离开队列的消息数监控趋势
消息确认速率条/秒每秒确认的消息数监控趋势

4.1.2 交换机指标

指标名称单位描述告警阈值建议
交换机总数个RabbitMQ中的交换机总数监控趋势
交换机入队速率条/秒每秒进入交换机的消息数监控趋势
交换机出队速率条/秒每秒离开交换机的消息数监控趋势
未路由消息数条/秒每秒未路由的消息数>0

4.1.3 连接与通道指标

指标名称单位描述告警阈值建议
连接总数个当前活跃的连接数>1000
通道总数个当前活跃的通道数>5000
连接关闭率个/秒每秒关闭的连接数突增
通道关闭率个/秒每秒关闭的通道数突增

4.1.4 消费者指标

指标名称单位描述告警阈值建议
消费者总数个当前活跃的消费者数监控趋势
消费者利用率%消费者的利用率>80%

4.1.5 内存与磁盘指标

指标名称单位描述告警阈值建议
内存使用率%RabbitMQ节点内存使用率>80%
磁盘空间使用率%RabbitMQ节点磁盘空间使用率>80%
磁盘告警阈值MB触发磁盘告警的剩余空间接近当前值

5. RocketMQ监控

5.1 关键监控指标

5.1.1 Broker指标

指标名称单位描述告警阈值建议
Broker在线状态状态Broker是否在线非在线
CPU使用率%Broker CPU使用率>85%
内存使用率%Broker内存使用率>85%
磁盘使用率%Broker磁盘使用率>85%
存储队列深度条存储队列中的消息数量监控趋势
发送TPS条/秒Broker每秒处理的发送请求数监控趋势
消费TPS条/秒Broker每秒处理的消费请求数监控趋势
消息大小KB平均消息大小监控趋势

5.1.2 生产者指标

指标名称单位描述告警阈值建议
发送延迟ms消息发送的平均延迟时间>100ms
发送成功率%消息发送成功率<99.9%
发送失败率%消息发送失败率>0.1%

5.1.3 消费者指标

指标名称单位描述告警阈值建议
消费延迟ms消息消费的平均延迟时间>1000ms
消费失败率%消息消费失败率>0.1%
重试队列消息数条重试队列中的消息数量>1000条
死信队列消息数条死信队列中的消息数量>100条
消费者数量个活跃的消费者数量监控趋势

5.1.4 主题与队列指标

指标名称单位描述告警阈值建议
主题数量个RocketMQ中的主题总数监控趋势
队列数量个所有主题的队列总数监控趋势
消息积累量条所有队列中的消息积累量>100000条

6. 部署与配置

6.1 数据源配置

  1. 在AIOPS平台界面中,点击"配置中心" > "数据源管理"
  2. 点击"新增数据源",选择对应的消息队列类型
  3. 填写以下基本信息:
    • 名称:消息队列实例名称
    • 类型:选择消息队列类型(Kafka、RabbitMQ等)
    • 地址:消息队列服务器地址(可以是集群地址列表)
    • 端口:消息队列服务端口
    • 用户名:具有监控权限的用户名(如有)
    • 密码:对应的密码(如有)
    • 采集间隔:数据采集间隔,建议10-30秒
    • 超时时间:连接超时时间,建议5秒
  4. 点击"测试连接"验证连接是否成功
  5. 点击"确定"保存配置

6.2 Kafka采集器配置

对于Kafka,可以通过以下两种方式采集数据:

  1. JMX方式:启用Kafka的JMX端口,通过JMX协议采集指标

    # 启动Kafka时开启JMX
    export JMX_PORT=9999
    bin/kafka-server-start.sh config/server.properties
    
  2. Prometheus Kafka Exporter(推荐):部署Prometheus Kafka Exporter采集指标

    # 使用Docker启动Kafka Exporter
    docker run -d --name kafka-exporter \
      -p 9308:9308 \
      danielqsj/kafka-exporter \
      --kafka.server=kafka1:9092,kafka2:9092
    
  3. 管理API:通过Kafka AdminClient API获取元数据和监控信息

6.3 RabbitMQ采集器配置

对于RabbitMQ,可以通过以下方式采集数据:

  1. HTTP API(推荐):使用RabbitMQ的HTTP API采集指标 需要确保启用了RabbitMQ Management插件:

    rabbitmq-plugins enable rabbitmq_management
    
  2. Prometheus RabbitMQ Exporter:部署Prometheus RabbitMQ Exporter采集指标

    # 使用Docker启动RabbitMQ Exporter
    docker run -d --name rabbitmq-exporter \
      -p 9419:9419 \
      -e RABBIT_URL=http://guest:guest@rabbitmq:15672 \
      kbudde/rabbitmq-exporter
    

6.4 RocketMQ采集器配置

对于RocketMQ,可以通过以下方式采集数据:

  1. 管理API:使用RocketMQ Admin API获取集群信息和监控指标
  2. Prometheus RocketMQ Exporter:部署Prometheus RocketMQ Exporter采集指标
  3. 自定义监控脚本:通过RocketMQ命令行工具结合脚本采集指标

7. 使用指南

7.1 消息队列列表

在AIOPS平台界面中,点击左侧菜单栏的"监控中心" > "消息队列监控" > "消息队列列表",进入消息队列列表页面:

  • 实例列表:展示所有已添加的消息队列实例的基本信息
  • 健康状态:显示消息队列实例的当前健康状态
  • 关键指标摘要:显示吞吐量、延迟、队列长度等关键指标的当前值
  • 筛选与搜索:支持按类型、状态等筛选消息队列实例
  • 操作:支持编辑、删除、查看详情等操作

7.2 实例详情

点击消息队列实例名称,进入实例详情页面:

  • 概览:显示消息队列实例的详细信息和核心指标
  • 性能监控:展示吞吐量、延迟等性能指标的趋势图
  • 资源使用:展示CPU、内存、磁盘等资源使用情况
  • 队列监控:展示各队列的状态、长度、消息速率等
  • 消费者监控:展示消费者组/消费者的消费状态和性能
  • 告警历史:展示该实例的历史告警信息

7.3 Kafka详情页特有功能

  • 主题监控:展示所有主题的状态和指标
  • 消费者组监控:展示消费者组的消费状态和滞后情况
  • 分区状态:展示各分区的ISR状态和副本同步情况
  • Broker状态:展示各Broker的状态和负载

7.4 RabbitMQ详情页特有功能

  • 队列详情:展示所有队列的详细信息和指标
  • 交换机监控:展示所有交换机的状态和指标
  • 连接监控:展示客户端连接和通道的状态
  • 集群状态:展示集群节点的状态和健康情况

7.5 RocketMQ详情页特有功能

  • Broker监控:展示所有Broker的状态和负载
  • 消费者组监控:展示消费者组的消费进度和延迟
  • 主题监控:展示所有主题的状态和队列分布
  • 生产者监控:展示生产者的发送性能和成功率

8. 告警配置

8.1 Kafka告警规则示例

- name: "Kafka消费者滞后量大"
  description: "Kafka消费者组的滞后量超过10000条消息"
  severity: "high"
  condition: "kafka_consumergroup_lag > 10000"
  duration: 300
  notifiers: ["email", "sms"]
  tags: ["kafka", "consumer"]

- name: "Kafka Broker离线"
  description: "Kafka Broker处于离线状态"
  severity: "critical"
  condition: "kafka_broker_online == 0"
  duration: 60
  notifiers: ["email", "sms", "wechat"]
  tags: ["kafka", "broker"]

- name: "Kafka磁盘使用率高"
  description: "Kafka Broker磁盘使用率超过85%"
  severity: "high"
  condition: "kafka_broker_disk_usage > 85"
  duration: 300
  notifiers: ["email", "sms"]
  tags: ["kafka", "resource"]

- name: "Kafka ISR收缩"
  description: "Kafka ISR集合发生收缩"
  severity: "high"
  condition: "changes(kafka_topic_partition_in_sync_replicas[5m]) < 0"
  duration: 60
  notifiers: ["email", "sms"]
  tags: ["kafka", "replication"]

8.2 RabbitMQ告警规则示例

- name: "RabbitMQ队列积压严重"
  description: "RabbitMQ队列就绪消息数超过1000条"
  severity: "high"
  condition: "rabbitmq_queue_messages_ready > 1000"
  duration: 300
  notifiers: ["email", "sms"]
  tags: ["rabbitmq", "queue"]

- name: "RabbitMQ连接数过多"
  description: "RabbitMQ连接数超过1000个"
  severity: "medium"
  condition: "rabbitmq_connections_total > 1000"
  duration: 300
  notifiers: ["email"]
  tags: ["rabbitmq", "connection"]

- name: "RabbitMQ内存使用率高"
  description: "RabbitMQ内存使用率超过80%"
  severity: "high"
  condition: "rabbitmq_memory_used_percent > 80"
  duration: 300
  notifiers: ["email", "sms"]
  tags: ["rabbitmq", "resource"]

- name: "RabbitMQ未确认消息过多"
  description: "RabbitMQ未确认消息数超过1000条"
  severity: "high"
  condition: "rabbitmq_queue_messages_unacknowledged > 1000"
  duration: 300
  notifiers: ["email", "sms"]
  tags: ["rabbitmq", "queue"]

8.3 通用告警规则示例

- name: "消息队列连接失败"
  description: "无法连接到消息队列"
  severity: "critical"
  condition: "mq_connection_status == 0"
  duration: 60
  notifiers: ["email", "sms", "wechat"]
  tags: ["message_queue", "connectivity"]

- name: "消息队列服务器CPU使用率高"
  description: "消息队列服务器CPU使用率超过85%"
  severity: "high"
  condition: "system_cpu_usage > 85"
  duration: 300
  notifiers: ["email", "sms"]
  tags: ["message_queue", "resource"]

- name: "消息生产速率异常下降"
  description: "消息生产速率下降超过50%"
  severity: "high"
  condition: "rate(mq_messages_in_total[5m]) < rate(mq_messages_in_total[1h] offset 1h) * 0.5"
  duration: 300
  notifiers: ["email", "sms"]
  tags: ["message_queue", "performance"]

9. 性能优化建议

9.1 Kafka性能优化

9.1.1 主题与分区设计

  • 合理设置分区数:分区数应基于预期吞吐量和消费者数量设置,一般每个分区的吞吐量约为10MB/s
  • 均匀分布分区:确保分区在集群中均匀分布,避免热点问题
  • 选择合适的副本因子:通常设置为3,平衡可用性和性能
  • 合理设置清理策略:根据数据保留需求选择合适的日志清理策略(delete或compact)

9.1.2 生产者优化

  • 批量发送:启用批量发送,设置合适的批次大小和延迟
    batch.size=16384
    linger.ms=5
    
  • 压缩消息:启用消息压缩,减少网络传输和存储开销
    compression.type=lz4
    
  • 选择合适的确认级别:根据可靠性需求选择确认级别
    acks=1  # 或0,all
    
  • 使用异步发送:使用回调函数处理发送结果,避免阻塞

9.1.3 消费者优化

  • 并行消费:增加消费者实例数量,但不超过分区数
  • 调整fetch大小:设置合适的fetch.max.bytes和fetch.max.wait.ms
    fetch.max.bytes=52428800
    fetch.max.wait.ms=500
    
  • 使用自动提交:对于非关键业务,可使用自动提交减少复杂度
    enable.auto.commit=true
    auto.commit.interval.ms=5000
    
  • 合理设置会话超时:确保消费者有足够时间处理消息
    session.timeout.ms=30000
    

9.1.4 Broker优化

  • 调整JVM参数:设置合适的堆大小和GC策略
    -Xms4g -Xmx4g -XX:+UseG1GC
    
  • 优化磁盘配置:使用SSD存储,或对日志目录使用单独的磁盘
  • 调整文件刷新策略:平衡性能和可靠性
    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
    
  • 调整网络参数:增加网络线程数和缓冲区大小
    num.network.threads=8
    num.io.threads=16
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    

9.2 RabbitMQ性能优化

9.2.1 队列优化

  • 使用持久化:对于需要可靠传递的消息,启用队列和消息持久化
  • 设置TTL:为不需要长期保存的消息设置过期时间
  • 使用惰性队列:对于大量消息但低访问率的场景,使用惰性队列
    rabbitmqctl set_policy lazy-queues "^lazy-" '{"queue-mode":"lazy"}' --apply-to queues
    
  • 限制队列大小:设置队列长度限制,防止内存溢出
    x-max-length: 10000
    x-max-length-bytes: 1000000000
    

9.2.2 消费者优化

  • 使用批量确认:对于高吞吐量场景,使用批量确认而非单条确认
  • 设置适当的QoS:使用basic.qos限制未确认消息数
    channel.basicQos(100);  // 每次预取100条消息
    
  • 避免长时间运行的消费者:及时确认消息,避免长时间占用通道
  • 使用消费者优先级:为重要消费者设置更高优先级

9.2.3 服务器优化

  • 调整内存限制:设置合适的内存水线,默认40%
    vm_memory_high_watermark.absolute = 2GB
    
  • 调整文件描述符限制:增加系统文件描述符限制
  • 优化网络设置:调整TCP缓冲区大小和连接超时时间
  • 使用集群和镜像队列:提高可用性和吞吐量

9.3 RocketMQ性能优化

9.3.1 Broker优化

  • 调整存储参数:设置合适的刷盘策略和消息存储路径
    flushDiskType=ASYNC_FLUSH
    storePathRootDir=/data/rocketmq/store
    
  • 优化文件保留时间:根据需求设置消息保留时间
    fileReservedTime=72
    
  • 调整工作线程数:根据负载调整发送和接收线程数
    sendMessageThreadPoolNums=16
    pullMessageThreadPoolNums=16
    
  • 使用异步刷盘:对于性能优先场景,使用异步刷盘

9.3.2 生产者优化

  • 使用批量发送:合并多条消息批量发送,提高吞吐量
  • 压缩消息:对于大型消息,启用压缩
    producer.setCompressMsgBodyOverHowmuch(4096);
    
  • 选择适当的投递模式:根据可靠性要求选择同步、异步或单向发送
  • 设置超时时间:避免请求长时间阻塞
    producer.setSendMsgTimeout(3000);
    

9.3.3 消费者优化

  • 调整消费线程数:根据消息处理复杂度设置合适的线程数
    consumer.setConsumeThreadMin(20);
    consumer.setConsumeThreadMax(64);
    
  • 使用批量消费:对于高吞吐量场景,启用批量消费
  • 优化重试策略:设置合理的重试次数和间隔
    consumer.setMaxReconsumeTimes(3);
    
  • 使用顺序消费:对于需要保证顺序的消息,使用顺序消费模式

10. 常见问题排查

10.1 消息积压

症状:

  • 队列长度持续增长
  • 消费者滞后量不断增加
  • 消息处理延迟变长

排查步骤:

  1. 检查生产者发送速率和消费者消费速率,确认是否存在速率不匹配
  2. 查看消费者日志,确认是否有异常导致消费失败
  3. 检查消费者资源使用情况(CPU、内存等),确认是否存在资源瓶颈
  4. 检查网络延迟,确认是否存在网络问题
  5. 分析消息大小,确认是否有过大消息导致处理缓慢

解决方案:

  • 增加消费者实例数量
  • 优化消费者代码,提高处理效率
  • 增加消费者资源配置
  • 拆分大消息为多个小消息
  • 调整批处理大小和确认方式

10.2 消息丢失

症状:

  • 生产者发送成功,但消费者未收到消息
  • 消息数量不匹配(生产数大于消费数)
  • 业务数据不一致

排查步骤:

  1. 检查消息队列配置,确认是否启用了持久化
  2. 查看生产者确认机制,确认是否正确处理确认结果
  3. 检查消费者确认方式,确认是否正确提交偏移量
  4. 查看消息队列日志,寻找错误信息
  5. 检查集群状态,确认是否有节点故障

解决方案:

  • 启用队列和消息持久化
  • 使用同步发送并处理确认结果
  • 确保消费者正确确认消息
  • 配置合适的副本因子(Kafka、RocketMQ)
  • 使用事务消息或幂等性处理机制

10.3 连接断开

症状:

  • 连接数突降
  • 客户端报告连接错误
  • 发送/消费失败率增加

排查步骤:

  1. 检查消息队列服务器状态和负载
  2. 查看网络连接状态,确认是否存在网络问题
  3. 检查连接超时配置,确认是否设置过短
  4. 查看客户端连接池配置,确认是否合理
  5. 检查防火墙和安全组设置,确认是否有连接限制

解决方案:

  • 增加连接超时时间
  • 优化连接池配置
  • 检查并修复网络问题
  • 调整服务器资源,减轻负载
  • 检查并修改防火墙规则

10.4 性能下降

症状:

  • 消息延迟增加
  • 吞吐量下降
  • 服务器资源使用率高

排查步骤:

  1. 检查消息队列服务器CPU、内存、磁盘I/O、网络等资源使用情况
  2. 分析消息模式,确认是否有突发流量
  3. 查看日志,寻找错误或警告信息
  4. 检查是否有长时间运行的操作阻塞了处理
  5. 分析消费者处理时间,确认是否存在处理瓶颈

解决方案:

  • 增加服务器资源或扩展集群规模
  • 优化生产者和消费者代码
  • 调整消息队列配置参数
  • 实施流量控制或限流措施
  • 考虑分片或分区以分散负载

11. 最佳实践

11.1 监控覆盖

  • 全面监控:监控消息队列的各个层面,从服务器资源到队列内部指标
  • 关注端到端:监控消息从生产到消费的完整链路
  • 设置合理阈值:根据业务需求和消息系统性能特性设置合理的告警阈值
  • 关联分析:将消息队列指标与上下游系统性能关联分析

11.2 容量规划

  • 性能基线:建立正常运行时的性能基线,作为扩容参考
  • 压力测试:定期进行压力测试,了解系统极限和瓶颈
  • 预估增长:基于历史数据和业务增长预测,提前规划扩容
  • 弹性伸缩:对于云环境,配置自动伸缩策略

11.3 高可用设计

  • 集群部署:部署多节点集群,避免单点故障
  • 数据复制:配置适当的副本数量,确保数据可靠性
  • 负载均衡:合理分配消息和消费者,避免热点问题
  • 故障转移:测试故障转移机制,确保在节点故障时能快速恢复

11.4 性能与可靠性平衡

  • 权衡配置:根据业务需求平衡性能和可靠性配置
  • 差异化处理:对不同重要程度的消息采用不同的处理策略
  • 定期优化:定期审查和优化消息队列配置和代码
  • 监控指标相关性:分析不同指标之间的相关性,找出性能瓶颈

11.5 安全最佳实践

  • 认证与授权:启用消息队列的认证和授权机制
  • 加密通信:使用SSL/TLS加密客户端与服务器之间的通信
  • 定期审计:审计访问日志和操作记录,及时发现异常
  • 限制资源使用:设置连接数、消息大小等限制,防止资源滥用
Last Updated:: 11/28/25, 3:06 PM
Contributors: sunxiaokun