发布到流

{% tabs %} {% tab title="Go" %}

func ExampleJetStream() {
	nc, err := nats.Connect("localhost")
	if err != nil {
		log.Fatal(err)
	}

	// 使用 JetStream 上下文来生产和消费已经持久化的消息。
	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
	if err != nil {
		log.Fatal(err)
	}

	js.AddStream(&nats.StreamConfig{
		Name:     "example-stream",
		Subjects: []string{"example-subject"},
	})

	js.Publish("example-subject", []byte("Hello JS!"))

	// 异步发布消息。
	for i := 0; i < 500; i++ {
		js.PublishAsync("example-subject", []byte("Hello JS Async!"))
	}
	select {
	case <-js.PublishAsyncComplete():
	case <-time.After(5 * time.Second):
		fmt.Println("未在规定时间内完成")
	}
}

{% endtab %}

{% tab title="Java" %}

try (Connection nc = Nats.connect("localhost")) {
    JetStreamManagement jsm = nc.jetStreamManagement();
    jsm.addStream(StreamConfiguration.builder()
        .name("example-stream")
        .subjects("example-subject")
        .build());

    JetStream js = jsm.jetStream();

    // 同步发布
    PublishAck pa = js.publish("example-subject", "Hello JS Sync!".getBytes());
    System.out.println("Publish Sequence: " + pa.getSeqno());

    // 异步发布
    CompletableFuture<PublishAck> future =
        js.publishAsync("example-subject", "Hello JS Async!".getBytes());

    try {
        pa = future.get(1, TimeUnit.SECONDS);
        System.out.println("Publish Sequence: " + pa.getSeqno());
    }
    catch (ExecutionException e) {
        // 可能是由于发布时的某些期望未能满足(高级功能)
        // 也可能是由于内部请求超时,导致发布确认未及时返回
    }
    catch (TimeoutException e) {
        // 超时异常,表示异步发布请求的超时时间比发布确认的超时时间短
    }
    catch (InterruptedException e) {
        // 异步发布线程被中断
    }
}

{% endtab %}

{% tab title="JavaScript" %}

import { connect, Empty } from "../../src/mod.ts";

const nc = await connect();

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "example-stream", subjects: ["example-subject"] });

const js = await nc.jetstream();
// jetstream 客户端提供了一个 publish 方法,该方法会返回一个确认,
// 表示消息已被服务器接收并存储。你可以为发布消息设置各种期望条件,
// 以防止重复消息。如果这些期望条件未满足,则消息会被拒绝。
let pa = await js.publish("example-subject", Empty, {
  msgID: "a",
  expect: { streamName: "example-stream" },
});
console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);

pa = await js.publish("example-subject", Empty, {
  msgID: "a",
  expect: { lastSequence: 1 },
});
console.log(`${pa.stream}[${pa.seq}]: duplicate? ${pa.duplicate}`);

await jsm.streams.delete("example-stream");
await nc.drain();

{% endtab %}

{% tab title="Python" %}

import asyncio

import nats
from nats.errors import TimeoutError


async def main():
    nc = await nats.connect("localhost")

    # 创建 JetStream 上下文。
    js = nc.jetstream()

    # 在 'example-subject' 上持久化消息。
    await js.add_stream(name="example-stream", subjects=["example-subject"])

    for i in range(0, 10):
        ack = await js.publish("example-subject", f"hello world: {i}".encode())
        print(ack)

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

{% endtab %}

{% tab title="C#" %}

// dotnet add package NATS.Net
using NATS.Net;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;

await using var client = new NatsClient();

INatsJSContext js = client.CreateJetStreamContext();

// 创建一个流
var streamConfig = new StreamConfig(name: "example-stream", subjects: ["example-subject"]);
await js.CreateStreamAsync(streamConfig);

// 发布一条消息
{
    PubAckResponse ack = await js.PublishAsync("example-subject", "Hello, JetStream!");
    ack.EnsureSuccess();
}

// 并发发布消息
List<NatsJSPublishConcurrentFuture> futures = new();
for (var i = 0; i < 500; i++)
{
    NatsJSPublishConcurrentFuture future
        = await js.PublishConcurrentAsync("example-subject", "Hello, JetStream 1!");
    futures.Add(future);
}

foreach (var future in futures)
{
    await using (future)
    {
        PubAckResponse ack = await future.GetResponseAsync();
        ack.EnsureSuccess();
    }
}

{% endtab %}

{% tab title="C" %}

#include "examples.h"

static const char *usage = ""\
"-stream        stream name (default is 'foo')\n" \
"-txt           text to send (default is 'hello')\n" \
"-count         number of messages to send\n" \
"-sync          publish synchronously (default is async)\n";

static void
_jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
{
    int *errors = (int*) closure;

    printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
    printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));

    *errors = (*errors + 1);

    // 如果我们想要重新发送原始消息,我们可以这样做:
    //
    // js_PublishMsgAsync(js, &(pae->Msg), NULL);
    //
    // 注意我们使用 `&(pae->Msg)`,这样如果库获取了所有权,它会将其设置为 NULL,
    // 并且库不会在此回调返回时销毁消息。

    // 不需要销毁任何东西,所有内容都由库处理。
}

int main(int argc, char **argv)
{
    natsConnection      *conn  = NULL;
    natsStatistics      *stats = NULL;
    natsOptions         *opts  = NULL;
    jsCtx               *js    = NULL;
    jsOptions           jsOpts;
    jsErrCode           jerr   = 0;
    natsStatus          s;
    int                 dataLen=0;
    volatile int        errors = 0;
    bool                delStream = false;

    opts = parseArgs(argc, argv, usage);
    dataLen = (int) strlen(payload);

    s = natsConnection_Connect(&conn, opts);

    if (s == NATS_OK)
        s = jsOptions_Init(&jsOpts);

    if (s == NATS_OK)
    {
        if (async)
        {
            jsOpts.PublishAsync.ErrHandler           = _jsPubErr;
            jsOpts.PublishAsync.ErrHandlerClosure    = (void*) &errors;
        }
        s = natsConnection_JetStream(&js, conn, &jsOpts);
    }

    if (s == NATS_OK)
    {
        jsStreamInfo    *si = NULL;

        // 首先检查流是否已存在。
        s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
        if (s == NATS_NOT_FOUND)
        {
            jsStreamConfig  cfg;

            // 由于我们是创建此流的人,我们可以在最后删除它。
            delStream = true;

            // 初始化配置结构。
            jsStreamConfig_Init(&cfg);
            cfg.Name = stream;
            // 设置主题
            cfg.Subjects = (const char*[1]){subj};
            cfg.SubjectsLen = 1;
            // 设置存储到内存。
            cfg.Storage = js_MemoryStorage;
            // 添加流,
            s = js_AddStream(&si, js, &cfg, NULL, &jerr);
        }
        if (s == NATS_OK)
        {
            printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                si->Config->Name, si->State.Msgs, si->State.Bytes);

            // 需要销毁返回的流对象。
            jsStreamInfo_Destroy(si);
        }
    }

    if (s == NATS_OK)
        s = natsStatistics_Create(&stats);

    if (s == NATS_OK)
    {
        printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
        start = nats_Now();
    }

    for (count = 0; (s == NATS_OK) && (count < total); count++)
    {
        if (async)
            s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
        else
        {
            jsPubAck *pa = NULL;

            s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
            if (s == NATS_OK)
            {
                if (pa->Duplicate)
                    printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);

                jsPubAck_Destroy(pa);
            }
        }
    }

    if ((s == NATS_OK) && async)
    {
        jsPubOptions    jsPubOpts;

        jsPubOptions_Init(&jsPubOpts);
        // 设置为 30 秒,如果收到"超时"错误,
        // 可能需要根据发送的消息数量增加此值。
        jsPubOpts.MaxWait = 30000;
        s = js_PublishAsyncComplete(js, &jsPubOpts);
        if (s == NATS_TIMEOUT)
        {
            // 获取待处理消息列表。我们可以重新发送,
            // 等等,但现在只是销毁它们。
            natsMsgList list;

            js_PublishAsyncGetPendingList(&list, js);
            natsMsgList_Destroy(&list);
        }
    }

    if (s == NATS_OK)
    {
        jsStreamInfo *si = NULL;

        elapsed = nats_Now() - start;
        printStats(STATS_OUT, conn, NULL, stats);
        printPerf("Sent");

        if (errors != 0)
            printf("There were %d asynchronous errors\n", errors);

        // 运行后报告一些统计信息
        s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
        if (s == NATS_OK)
        {
            printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
                si->Config->Name, si->State.Msgs, si->State.Bytes);

            jsStreamInfo_Destroy(si);
        }
    }
    if (delStream && (js != NULL))
    {
        printf("\nDeleting stream %s: ", stream);
        s = js_DeleteStream(js, stream, NULL, &jerr);
        if (s == NATS_OK)
            printf("OK!");
        printf("\n");
    }
    if (s != NATS_OK)
    {
        printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
        nats_PrintLastErrorStack(stderr);
    }

    // 销毁所有对象以避免内存泄漏警告
    jsCtx_Destroy(js);
    natsStatistics_Destroy(stats);
    natsConnection_Destroy(conn);
    natsOptions_Destroy(opts);

    // 为了在使用 valgrind 时避免报告内存仍在使用中
    nats_Close();

    return 0;
}

{% endtab %} {% endtabs %}