慢消费者

NATS 旨在快速地在服务器间递送消息。因此,NATS 依赖应用程序来考虑并应对不断变化的消息速率。服务器会进行一定程度的阻抗匹配,但如果客户端处理速度过慢,服务器最终将通过关闭连接来切断该客户端。这些被切断的连接被称为 慢速消费者

一些库通过为订阅缓冲传入消息的方式来处理突发性消息流量。例如,如果应用程序每秒可以处理 10 条消息,但有时每秒收到 20 条消息,那么库可能会保留额外的 10 条消息,以给应用程序时间追上。从服务器的角度来看,应用程序似乎正在处理消息,并且连接被认为是健康的。大多数客户端库会对应用程序通知 SlowConsumer 错误,并丢弃消息。

从服务器接收并丢弃消息可以保持与服务器的连接健康,但也会对应用程序提出要求。以下是几种常见的模式:

  • 使用请求-回复(request-reply)机制限制发送方,防止订阅者过载。

  • 使用队列(queue),让多个订阅者分担工作。

  • 使用类似 NATS 流式传输(Streaming)的功能持久化消息。

缓存传入消息的库可能提供两种控制方式,用于管理传入队列或待处理消息。这些功能在发布者突然发送了很多消息,而非持续地性能不匹配的情况下非常有用。在生产环境中禁用这些限制可能是危险的,尽管将这些限制设置为 0 可能有助于发现潜在问题,但在生产环境中这样做也是有风险的。

请查阅您所使用的库的文档,了解默认设置以及是否支持禁用这些限制。

传入缓存通常是每个订阅者的独立设定,但请再次检查您所使用客户端库的具体文档。

按消息数量和字节数限制传入/待处理消息

传入队列的第一个限制方式是按消息数量限制。第二个限制方式是按总大小限制。例如,要将传入缓存限制为最多 1,000 条消息或 5MB(以先达到为准):

nc, err := nats.Connect("demo.nats.io")
if err != nil {
    log.Fatal(err)
}
defer nc.Close()

// Subscribe
sub1, err := nc.Subscribe("updates", func(m *nats.Msg) {})
if err != nil {
    log.Fatal(err)
}

// 设置限制:1000 条消息或 5MB,以先达到为准
sub1.SetPendingLimits(1000, 5*1024*1024)

// Subscribe
sub2, err := nc.Subscribe("updates", func(m *nats.Msg) {})
if err != nil {
    log.Fatal(err)
}

// 为该订阅取消限制
sub2.SetPendingLimits(-1, -1)

// 关闭连接
nc.Close()

检测慢速消费者并检查丢弃的消息

当检测到慢速消费者且消息即将被丢弃时,库可能会通知应用程序。此过程可能类似于其他错误,也可能涉及自定义回调。

某些库,如 Java,不会在每次丢弃消息时都发送此通知,因为这可能会产生大量噪音。相反,通知可能会在订阅者落后时发送一次。库也可能提供一种获取已丢弃消息数的方法,以便应用程序能够至少检测到问题的发生。

最后更新于