流
流是消息存储库,每个流定义了消息的存储方式以及保留的限制(持续时间、大小、兴趣)。流消费普通的 NATS 主题,发布到这些主题上的任何消息都将被捕获到定义的存储系统中。您可以向该主题进行普通发布以实现无需确认的传递,不过最好改用 JetStream 的 publish 调用,因为 JetStream 服务器会回复一个确认,表明消息已成功存储(译者注:长话短说就是有 ack 机制确保消息成功被 NATS 服务器收到了)。

上图展示了将所有 ORDERS.* 存储在流中的概念,尽管存在多种与订单相关的消息类型。我们稍后将展示如何有选择地消费消息的子集。相对而言,流是最消耗资源的组件,因此能够以这种方式组合相关数据是需要考虑的重要因素。
一个流可以消费多个主题。例如这里的 ORDERS.*,但如果合理,我们也可以将 SHIPPING.state 消费到同一个流中。
流的限制和消息保留
流支持各种保留策略,这些策略定义了流中的消息何时可以被自动删除,例如当达到流限制时(如最大消息数量、大小或存在时间),或者还可以在限制之上应用更新颖的选项,例如基于兴趣的保留或工作队列语义(参见 保留策略)。
达到消息限制后,服务器将通过删除最旧的消息来为新消息腾出空间(DiscardOld),或者拒绝存储新消息(DiscardNew)来自动丢弃消息。更多详情,请参见 丢弃策略。
流支持使用 Nats-Msg-Id 头和一个用于跟踪重复消息的滑动窗口进行去重。参见 消息去重 部分。
关于如何使用您首选的 NATS 客户端配置流的示例,参见 NATS by Example。
配置
以下是可定义的流配置选项集合。Version 列表示引入该选项的服务器版本。Editable 列表示该选项在流创建后是否可以编辑。参见特定客户端的示例。
StorageType
存储类型包括:
File(默认)- 使用 基于文件 的存储方案来保存流数据。Memory- 使用 基于内存 的存储方案来保存流数据。
Subjects
注意:不可以为已 配置为镜像的流 配置一组主题。镜像会隐式获取 原始流 的子集(可选带过滤器),但不会订阅额外主题。
若未显式指定主题,则默认主题与流同名。可以指定多个主题并在未来更改。请注意,在某个主题上,如果消息被流存储,而该主题随后被从流配置中删除,只要消费者主题过滤器存在重叠,消费者仍将接收到这些消息。
RetentionPolicy
保留策略选项包括:
LimitsPolicy(默认)- 基于设置的各种限制进行保留,包括:MaxMsgs、MaxBytes、MaxAge和MaxMsgsPerSubject。若设置了任何这些限制,当达到其中任意限制时,系统将自动删除对应消息。详见完整代码示例。WorkQueuePolicy- 采用 典型 FIFO 队列(先入先出队列)行为的保留策略。每条消息只能被消费一次。这是通过 限制流中每个主题只允许创建一个消费者(即消费者的主题过滤器不能重叠)来实现的。一旦给定的消息被确认(ack’ed),它将被从流中删除。参见 完整代码示例。InterestPolicy- 基于消费者对流和消息的兴趣进行保留。常见场景是流未定义任何消费者。如果将消息发布到这种流,它们将立即被删除,因此不存在兴趣。这意味着消费者需在消息发布前绑定至该流。一旦给定的消息被所有筛选该主题的消费者确认,该消息就会被删除(与WorkQueuePolicy行为相同)。参见 完整代码示例。
{% hint style="warning" %}
如果为流选择了 InterestPolicy 或 WorkQueuePolicy,请注意,任何已定义的限制仍然有效。例如,给定一个工作队列流,如果设置了 MaxMsgs 并且丢弃策略为旧的(默认),则消息将被自动删除,即使消费者没有收到它们。
{% endhint %}
{% hint style="info" %}
WorkQueuePolicy 流只会在达到限制时或消息被其消费者成功 Ack’d 时删除消息。已达到消费者 MaxDeliver 尝试次数的重试消息将保留在流中,必须通过 JetStream API 手动删除。
{% endhint %}
DiscardPolicy
丢弃行为仅适用于至少定义了一个限制的流。选项包括:
DiscardOld(默认)- 此策略将删除最旧的消息以维持限制。例如,如果MaxAge设置为一分钟,服务器将自动删除超过一分钟的消息。DiscardNew- 此策略将拒绝将新消息附加到流中,如果这会超出其中一个限制。此策略的一个扩展是DiscardNewPerSubject,它将在流内基于每个主题应用此策略。
Placement
Placement 指的是流资产(数据)在 NATS 部署中的放置位置,无论是单个集群还是超级集群。给定的流,包括所有副本(非镜像),都绑定到单个集群。因此,在创建或移动流时,将选择一个集群来托管这些资产。
如果没有显式为流声明放置,默认情况下,流将在客户端连接到的集群中创建(假设该集群有足够的可用存储空间)。
通过声明流放置,可以显式控制这些资产的存放位置。这通常用于与最活跃的客户端(发布者或消费者)协同定位,或者可能由于数据主权原因而需要。
所有客户端 SDK 以及 CLI 都支持放置。例如,通过 CLI 添加一个流以将流放置在特定集群中如下所示:
为此,给定集群中的所有服务器都必须在 cluster 服务器配置块中定义 name 字段。
如果您有多个形成超级集群的集群,则每个集群都需要有不同的名称。
另一个放置选项是标签。每个服务器都可以有自己的一组标签,在配置中定义,通常描述地理、托管提供商、规模层级等属性。此外,标签通常与 jetstream.unique_tag 配置选项结合使用,以确保副本必须部署在具有不同标签值的服务器上。
例如,上述集群中的服务器 A、B、C 可能除部署的可用区外配置完全相同。
现在我们可以使用标签来创建一个流,例如指示我们希望在 us-east1 中有一个流。
如果我们在 Google Cloud 中有第二个集群、具有相同区域标签,则该流可以放置在 AWS 或 GCP 集群中。但是,unique_tag 约束确保每个副本将放置在通过标签隐式选择的集群中的、不同的 可用区 中。
虽然不太常见,但请注意集群 和 标签都可以用于放置。如果单个集群包含具有不同属性的服务器,则会使用此方法。
Sources 和 Mirrors
当一个流被配置了 source 或 mirror 时,它将自动且异步地从原始流复制消息。声明配置时有几个选项。
设置了 Sources 的流、镜像流(被配置了 mirror 的流)都可以有自己的保留策略、复制和存储类型。对这些流的更改,例如删除消息或发布,不会反映在原始流上。
{% hint style="info" %}
Sources 是 Mirror 的泛化形式,支持从一个或多个流并行获取数据。我们建议在新配置中使用 Sources。
如果您希望目标流充当只读副本,你可以:
- 配置没有监听任何主题的流或者
- 通过客户端授权临时禁用监听主题。 {% endhint %}
Stream sources
定义了 Sources 的流,可作为一种通用的复制机制,它允许同时从一个或多个流获取数据,并允许客户端直接写入/发布。本质上,来自 Sources 中的流的写入、和来自客户端的写入被交叉聚合到一个单一流中。
主题转换和过滤 为你解锁了强大的数据分发架构!
Mirrors
镜像流只能从一个流获取消息,并且客户端不能直接写入镜像。虽然客户端不能直接向镜像发布消息,但可以按需删除消息(超出保留策略),并且消费者享有常规流上所有可用的功能。
详见:
AllowRollup
如果启用 AllowRollup 流选项,系统将允许已发布的消息带有 Nats-Rollup header,指示应清除所有先前的消息。清除 的范围由头值定义,可以是 all 或 sub。
Nats-Rollup: all header 将清除流中所有先前的消息。而 sub 值将清除给定主题下的所有先前消息。
Rollup 的一个常见用例是状态快照,其中正在发布的消息已累积了来自先前消息的所有必要状态,相对于流或特定主题(译注:例如一个 游戏服务器 在广播自己的在线人数(状态),而接收方只需要知道它最新的在线人数,这种情况就适合用 AllowRollup)。
RePublish
如果启用 RePublish 流选项,系统将在成功写入接收到的消息后,立即自动将消息重新发布到不同的目标主题。
对于大规模需求,目前,一个专用的 JetStream 消费者 可能会增加过多开销,这种情况下,客户端就可以拿 RePublish 特性建立对目标主题的轻量的 Core NATS 订阅,并实时接收附加到流的消息。
配置 RePublish 的字段包括:
Source- 一个可选的主题模式,它是绑定到流的主题的子集。它默认为流中的所有消息,例如>。Destination- 消息将被重新发布到的目标主题。源和目标必须是有效的主题映射。HeadersOnly- 如果为 true,则消息数据将不会包含在重新发布的消息中,只会添加一个额外的头Nats-Msg-Size来指示消息的大小(字节)。
对于每条重新发布的消息,会自动添加一组 headers。
SubjectTransform
如果配置了 SubjectTransform,系统就会对流接收到的消息中,匹配到的主题执行一次主题转换(SubjectTransform),并在将消息存储到流中之前变换其主题。转换配置指定了 Source 和 Destination 字段,遵循主题转换的规则。
