Streams

第一步是为我们与 ORDERS 相关的消息设置存储,这些消息通过一组使用通配符的主题到达,全部流入同一个流,并保留 1 年。

创建

nats str add ORDERS
? Subjects to consume ORDERS.*
? Storage backend file
? Retention Policy Limits
? Discard Policy Old
? Message count limit -1
? Message size limit -1
? Maximum message age limit 1y
? Maximum individual message size [? for help] (-1) -1
Stream ORDERS was created

Information for Stream ORDERS

Configuration:

             Subjects: ORDERS.*
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 1
     Maximum Messages: -1
        Maximum Bytes: -1
          Maximum Age: 8760h0m0s
 Maximum Message Size: -1
  Maximum Consumers: -1

Statistics:

            Messages: 0
               Bytes: 0 B
            FirstSeq: 0
             LastSeq: 0
    Active Consumers: 0

您可以像上面这样以交互方式补全缺失的信息,也可以在一个命令中完成所有操作。在 CLI 中按 ? 可以帮助您了解提示对应的 CLI 选项:

此外,可以将配置存储在 JSON 文件中,其格式与 $ nats str info ORDERS -j | jq .config 相同:

列出

我们可以确认我们的流已创建:

查询

可以查看关于流配置的信息,如果您没有像下面这样指定流,它会根据所有已知的流提示您:

大多数像上面这样显示数据的命令都支持 -j 选项,以 JSON 格式显示结果:

这是整个 nats 工具与 JetStream 相关操作的通用模式——提示输入必要的信息,但每个操作都可以非交互式运行,使其可用作一个 CLI API。所有像上面这样的信息输出都可以使用 -j 转换为 JSON。

复制

可以将一个流复制到另一个流,这也允许通过 CLI 标志调整新流的配置:

编辑

可以编辑流的配置,这允许通过 CLI 标志调整配置。这里我有一个错误创建的 ORDERS 流,我修复它:

更改流的主题

此外,可以将配置存储在 JSON 文件中,其格式与 $ nats str info ORDERS -j | jq .config 相同:

发布消息到流中

现在让我们向我们的流添加一些消息。您可以使用 nats pub 来添加消息,传递 --wait 标志可以看到返回的发布确认。

您可以在无 ACK 机制的情况下发布:

但如果您想确保您的消息已到达 JetStream 并被持久化,您可以发出一个请求:

在执行此操作时持续查看流的状态,您会看到其存储的消息数量增加。

在向流中放入一些临时数据后,我们可以清除所有数据——同时保持流处于活动状态:

删除所有数据

要删除流中的所有数据,请使用 purge

删除单条消息

可以从流中安全地删除单条消息:

删除流

最后,出于演示目的,您也可以删除整个流并重新创建它。然后我们就可以准备创建消费者了:

最后更新于