管理 Streams 与 Consumers

流和持久化消费者可以在应用程序之外进行管理(通常使用 NATS CLI 工具),在这种情况下,应用程序只需知道它想要使用的持久化消费者的已知名称即可。但您也可以以编程方式管理流和消费者。

常见的流管理操作包括:

  • Adding a stream - 添加流是一个幂等函数,这意味着如果流不存在,则会创建它;如果流已经存在,则仅当现有流与 “add” 调用中指定的属性完全匹配时,添加操作才会成功。

  • Delete a stream - 删除流。

  • Purge a stream - 清空流(删除存储在流中的所有消息)。

  • 通过序列号(sequence number)获取或移除流中的特定消息。

  • 添加、更新(或删除)消费者。

  • 获取 流/消费者/账户 的信息、统计信息。获取/移除 存储在流中的单个消息的信息。

func ExampleJetStreamManager() {
	nc, _ := nats.Connect("localhost")

	js, _ := nc.JetStream()

	// Create a stream
	js.AddStream(&nats.StreamConfig{
		Name:     "example-stream",
		Subjects: []string{"example-subject"},
		MaxBytes: 1024,
	})

	// Update a stream
	js.UpdateStream(&nats.StreamConfig{
		Name:     "example-stream",
		MaxBytes: 2048,
	})

	// Create a durable consumer
	js.AddConsumer("example-stream", &nats.ConsumerConfig{
		Durable: "example-consumer-name",
	})

	// Get information about all streams (with Context JSOpt)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	for info := range js.StreamsInfo(nats.Context(ctx)) {
		fmt.Println("stream name: ", info.Config.Name)
	}

	// Get information about all consumers (with MaxWait JSOpt)
	for info := range js.ConsumersInfo("example-stream", nats.MaxWait(10*time.Second)) {
		fmt.Println("consumer name: ", info.Name)
	}

	// Delete a consumer
	js.DeleteConsumer("example-stream", "example-consumer-name")

	// Delete a stream
	js.DeleteStream("example-stream")
}

最后更新于