断开连接前排空消息

Drain 功能的能力在于先“排空”连接或订阅关系,随后再关闭连接。关闭连接(通过close()函数)或取消订阅通常被库视为需要立即响应的请求。当应用程序关闭连接或取消订阅时,库会停止为订阅者处理任何待处理的队列或缓存消息。而通过 Drain 释放订阅/连接时,库会先处理所有正在传输的(inflight)消息及缓存/待处理的消息,再执行关闭操作。

Drain功能 为使用队列订阅的客户端提供了一种在不丢失任何消息的情况下关闭应用程序的方法。客户端可以先启动新的队列成员,再排空并关闭旧的队列成员,整个过程不会丢失发送到旧客户端的消息。如果没有"Drain"功能,可能会由于交付时机而导致消息丢失。

库可以为 连接/订阅 提供排空功能,或者同时支持两者。

对于连接,排空 过程本质上是:

  1. 排空所有订阅

  2. 停止新消息的发布

  3. 刷新所有剩余的已发布消息

  4. 关闭

通常可使用 API Drain 代替 close

以下是排空连接的示例:

wg := sync.WaitGroup{}
wg.Add(1)

errCh := make(chan error, 1)

// To simulate a timeout, you would set the DrainTimeout()
// to a value less than the time spent in the message callback,
// so say: nats.DrainTimeout(10*time.Millisecond).

nc, err := nats.Connect("demo.nats.io",
    nats.DrainTimeout(10*time.Second),
    nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
        errCh <- err
    }),
    nats.ClosedHandler(func(_ *nats.Conn) {
        wg.Done()
    }))
if err != nil {
    log.Fatal(err)
}

// Just to not collide using the demo server with other users.
subject := nats.NewInbox()

// Subscribe, but add some delay while processing.
if _, err := nc.Subscribe(subject, func(_ *nats.Msg) {
    time.Sleep(200 * time.Millisecond)
}); err != nil {
    log.Fatal(err)
}

// Publish a message
if err := nc.Publish(subject, []byte("hello")); err != nil {
    log.Fatal(err)
}

// Drain the connection, which will close it when done.
if err := nc.Drain(); err != nil {
    log.Fatal(err)
}

// Wait for the connection to be closed.
wg.Wait()

// Check if there was an error
select {
case e := <-errCh:
    log.Fatal(e)
default:
}

The mechanics of drain for a subscription are simpler:

  1. Unsubscribe

  2. Process all cached or inflight messages

  3. Clean up

The API for drain can generally be used instead of unsubscribe:

Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request-reply or similar.

最后更新于