你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

其他拓展应用(二十)

2021/12/31 8:59:24

死信队列和重试队列

       由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。后续分析程序可以通过消费这个死信队列中的内容来分析当时遇到的异常情况,进而可以改善和优化系统。

      与死信队列对应的还有一个“回退队列”的概念,如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认,进而发生回滚消息的操作之后,消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。

       无论 RabbitMQ 中的队列,还是 Kafka 中的主题,其实质上都是消息的载体,换种角度看待问题可以让我们找到彼此的共通性。我们依然可以把 Kafka 中的主题看作“队列”,那么重试队列、死信队列的称谓就可以同延时队列一样沿用下来。 理解死信队列,关键是要理解死信。死信可以看作消费者不能处理收到的消息,也可以看作消费者不想处理收到的消息,还可以看作不符合处理要求的消息。比如消息内包含的消息内容无法被消费者解析,为了确保消息的可靠性而不被随意丢弃,故将其投递到死信队列中,这里的死信就可以看作消费者不能处理的消息。再比如超过既定的重试次数之后将消息投入死信队列,这里就可以将死信看作不符合处理要求的消息。

       至于死信队列到底怎么用,是从 broker 端存入死信队列,还是从消费端存入死信队列,需要先思考两个问题:死信有什么用?为什么用?从而引发怎么用。在 RabbitMQ 中,死信一般通过 broker 端存入,而在 Kafka 中原本并无死信的概念,所以当需要封装这一层概念的时候,就可以脱离既定思维的束缚,根据应用情况选择合适的实现方式,理解死信的本质进而懂得如何去实现死信队列的功能。

       重试队列其实可以看作一种回退队列,具体指消费端消费消息失败时,为了防止消息无故丢失而重新将消息回滚到 broker 中。与回退队列不同的是,重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。举个例子:消息第一次消费失败入重试队列 Q1,Q1 的重新投递延时为5s,5s过后重新投递该消息;如果消息再次消费失败则入重试队列 Q2,Q2 的重新投递延时为10s,10s过后再次投递该消息。以此类推,重试越多次重新投递的时间就越久,为此还需要设置一个上限,超过投递次数就进入死信队列。重试队列与延时队列有相同的地方,都需要设置延时级别。它们的区别是:延时队列动作由内部触发,重试队列动作由外部消费端触发;延时队列作用一次,而重试队列的作用范围会向后传递。

消息路由

      消息路由是消息中间件中常见的一个概念,比如在典型的消息中间件 RabbitMQ 中就使用路由键 RoutingKey 来进行消息路由。如下图所示,RabbitMQ 中的生产者将消息发送到交换器 Exchange 中,然后由交换器根据指定的路由键来将消息路由到一个或多个队列中,消费者消费的是队列中的消息。从整体上而言,RabbitMQ 通过路由键将原本发往一个地方的消息做了区分,然后让不同的消息者消费到自己要关注的消息。

11-9

 

       Kafka 默认按照主题进行路由,也就是说,消息发往主题之后会被订阅的消费者全盘接收,这里没有类似消息路由的功能来将消息进行二级路由,这一点从逻辑概念上来说并无任何问题。从业务应用上而言,如果不同的业务流程复用相同的主题,就会出现消息接收时的混乱,这种问题可以从设计上进行屏蔽,如果需要消息路由,那么完全可以通过细粒度化切分主题来实现。

11-10

 

       除了设计缺陷,还有一些历史遗留的问题迫使我们期望 Kafka 具备一个消息路由的功能。如果原来的应用系统采用了类似 RabbitMQ 这种消息路由的生产消费模型,运行一段时间之后又需要更换为 Kafka,并且变更之后还需要保留原有系统的编程逻辑。对此,我们首先需要在这个整体架构中做一层关系映射,如上图所示。这里将 Kafka 中的消费组与 RabbitMQ 中的队列做了一层映射,可以根据特定的标识来将消息投递到对应的消费组中,按照 Kafka 中的术语来讲,消费组根据消息特定的标识来获取消息,其余的都可以被过滤。

11-11

 

       具体的实现方式可以在消息的 headers 字段中加入一个键为“routingkey”、值为特定业务标识的 Header,然后在消费端中使用拦截器挑选出特定业务标识的消息。Kafka 中消息路由的实现架构如上图所示,消费组 ConsumerGroup1 根据指定的 Header 标识 rk2 和 rk3 来消费主题 TopicA 和 TopicB 中所有对应的消息而忽略 Header 标识为 rk1 的消息,消费组 ConsumerGroup2 正好相反。

        这里只是演示作为消息中间件家族之一的 Kafka 如何实现消息路由的功能,不过消息路由在 Kafka 的使用场景中很少见,如无特殊需要,也不推荐刻意地使用它。

消息轨迹

       在使用消息中间件时,我们时常会遇到各种问题:消息发送成功了吗?为什么发送的消息在消费端消费不到?为什么消费端重复消费了消息?对于此类问题,我们可以引入消息轨迹来解决。消息轨迹指的是一条消息从生产者发出,经由 broker 存储,再到消费者消费的整个过程中,各个相关节点的状态、时间、地点等数据汇聚而成的完整链路信息。生产者、broker、消费者这3个角色在处理消息的过程中都会在链路中增加相应的信息,将这些信息汇聚、处理之后就可以查询任意消息的状态,进而为生产环境中的故障排除提供强有力的数据支持。

       对消息轨迹而言,最常见的实现方式是封装客户端,在保证正常生产消费的同时添加相应的轨迹信息埋点逻辑。无论生产,还是消费,在执行之后都会有相应的轨迹信息,我们需要将这些信息保存起来。这里可以参考 Kafka 中的做法,它将消费位移信息保存在主题 __consumer_offset 中。对应地,我们同样可以将轨迹信息保存到 Kafka 的某个主题中,比如下图中的主题 trace_topic。

11-12

 

      生产者在将消息正常发送到用户主题 real_topic 之后(或者消费者在拉取到消息消费之后)会将轨迹信息发送到主题 trace_topic 中。这里有两种发送方式:第一种是直接通过 KafkaProducer 发送,为了不对普通的消息发送造成影响,可以采取“低功耗”的(比如异步、acks=0 等)发送配置,不过有可能会造成轨迹信息的丢失。

        另一种方式是将轨迹信息保存到本地磁盘,然后通过某个传输工具(比如 Flume)来同步到 Kafka 中,这种方式对正常发送/消费逻辑的影响较小、可靠性也较高,但是需要引入额外的组件,增加了维护的风险。

       消息轨迹中包含生产者、broker 和消费者的消息,但是上图中只提及了生产者和消费者的轨迹信息的保存而并没有提及 broker 信息的保存。生产者在发送消息之后通过确认信息来得知是否已经发送成功,而在消费端就更容易辨别一条消息是消费成功了还是失败了,对此我们可以通过客户端的信息反推出 broker 的链路信息。当然我们也可以在 broker 中嵌入一个前置程序来获得更多的链路信息,比如消息流入时间、消息落盘时间等。不过在 broker 内嵌前置程序,如果有相关功能更新,难免需要重启服务,如果只通过客户端实现消息轨迹,则可以简化整体架构、灵活部署,本节针对后者做相关的讲解。

        一条消息对应的消息轨迹信息所包含的内容(包含生产者和消费者)如下表所示。

角 色信 息 项释 义
生产者
消息ID能够唯一标识一条消息,在查询检索页面可以根据这个消息ID进行精准检索
消息Key消息中的key字段
发送时间消息发送的时间,指生产者的本地时间
发送耗时消息发送的时长,从调用send()方法开始到服务端返回的总耗时
发送状态发送成功或发送失败
发送的目的地址Kafka集群地址,为broker准备的链路信息
消息的主题主题,为broker准备的链路信息
消息的分区分区,为broker准备的链路信息
生产者的IP生产者本地的IP地址
生产者的ID生产者的唯一标识,可以用client.id替代
用户自定义信息(Tags)用户自定义的一些附加属性,方便后期检索
消费者
消息ID能够唯一标识一条消息
消息Key消息中的key字段
接收时间拉取到消息的时间,指消费者本地的时间
消费耗时消息消费的时长,从拉取到消息到业务处理完这条消息的总耗时
消费状态消费成功或消费失败
重试次数第几次重试消费
消费的源地址Kafka集群地址,为broker准备的链路信息,便于链路的串成
消息的主题主题,为broker准备的链路信息,便于链路的串成
消息的分区分区,为broker准备的链路信息,便于链路的串成
消费组消费组的名称
消费者的IP消费者本地的IP地址
消费者的ID消费者的唯一标识,可以用client.id替代
用户自定义信息(tags)用户自定义的一些附加属性,方便后期检索

       轨迹信息保存到主题 trace_topic 之后,还需要通过一个专门的处理服务模块对消息轨迹进行索引和存储,方便有效地进行检索。在查询检索页面进行检索的时候可以根据具体的消息 ID 进行精确检索,也可以根据消息的 key、主题、发送/接收时间进行模糊检索,还可以根据用户自定义的 Tags 信息进行有针对性的检索,最终查询出消息的一条链路轨迹。下图中给出一个链路轨迹的示例,根据这个示例我们可以清楚地知道某条消息所处的状态。

11-13

 

消息审计

       消息审计是指在消息生产、存储和消费的整个过程之间对消息个数及延迟的审计,以此来检测是否有数据丢失、是否有数据重复、端到端的延迟又是多少等内容。

        目前与消息审计有关的产品也有多个,比如 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它们主要通过在消息体(value 字段)或在消息头(headers 字段)中内嵌消息对应的时间戳 timestamp 或全局的唯一标识 ID(或者是两者兼备)来实现消息的审计功能。

       内嵌 timestamp 的方式主要是设置一个审计的时间间隔 time_bucket_interval(可以自定义设置几秒或几分钟),根据这个 time_bucket_interval 和消息所属的 timestamp 来计算相应的时间桶(time_bucket)。

算法1:timestamp – timestamp % time_bucket_interval(这个算法在时间轮里也有提及)
算法2:(long)Math.floor((timestamp/time_bucket_interval) * time_bucket_interval)

     根据上面的任意一种算法可以获得 time_bucket 的起始时间 time_bucket_start,那么这个 time_bucket 的时间区间可以记录为(time_bucket_start, time_bucket_start+time_bucket_interval),注意是左闭右开区间。每发送一条或消费一条消息,可以根据消息中内嵌的 timestamp 来计算并分配到相应的 time_bucket 中,然后对桶进行计数并存储,比如可以简单地存储到 Map<long time_bucket_start, long count> 中。

       内嵌 ID 的方式就更加容易理解了,对于每一条消息都会被分配一个全局唯一标识 ID,这个和消息轨迹中的消息 ID 是同一个东西。如果主题和相应的分区固定,则可以为每个分区设置一个全局的 ID。当有消息发送时,首先获取对应的 ID,然后内嵌到消息中,最后才将它发送到 broker 中。消费者进行消费审计时,可以判断出哪条消息丢失、哪条消息重复。

       如果还要计算端到端延迟,那么就需要在消息中内嵌 timestamp,也就是消息中同时含有 ID 和 timestamp,细心的读者可能注意到这两类信息在消息轨迹的功能中也都包含了进去。的确如此,我们可以将消息轨迹看作细粒度化的消息审计,而消息审计可以看作粗粒度化的消息轨迹。

11-14

 

       消息审计的实现模型也和消息轨迹的类似,同样是通过封装自定义的 SDK 来实现的。上图中展示的是 Confluent Control Center的消息审计的实现模型,它通过生产者客户端和消费者客户端的拦截器来实现审计信息的保存,这里的审计信息同样保存到 Kafka 中的某个主题中,最后通过 Confluent Control Center 进行最终的信息处理和展示。如果读者需要类似消息审计的功能,不妨参照此类的实现。