nc,err:=nats.Connect("demo.nats.io",nats.ErrorHandler(func(nc*nats.Conn,s*nats.Subscription,errerror){ifs!=nil{log.Printf("Async error in %q/%q: %v",s.Subject,s.Queue,err)}else{log.Printf("Async error outside subscription: %v",err)}}))iferr!=nil{log.Fatal(err)}defernc.Close()ec,err:=nats.NewEncodedConn(nc,nats.JSON_ENCODER)iferr!=nil{log.Fatal(err)}deferec.Close()// Define the objecttypestockstruct{SymbolstringPriceint}wg:=sync.WaitGroup{}wg.Add(1)// Subscribe// Decoding errors will be passed to the function supplied via// nats.ErrorHandler above, and the callback supplied here will// not be invoked.if_,err:=ec.Subscribe("updates",func(s*stock){log.Printf("Stock: %s - Price: %v",s.Symbol,s.Price)wg.Done()});err!=nil{log.Fatal(err)}// Wait for a message to come inwg.Wait()
class StockForJsonSub {
public String symbol;
public float price;
public String toString() {
return symbol + " is at " + price;
}
}
public class SubscribeJSON {
public static void main(String[] args) {
try {
Connection nc = Nats.connect("nats://demo.nats.io:4222");
// Use a latch to wait for 10 messages to arrive
CountDownLatch latch = new CountDownLatch(10);
// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
Gson gson = new Gson();
String json = new String(msg.getData(), StandardCharsets.UTF_8);
StockForJsonSub stk = gson.fromJson(json, StockForJsonSub.class);
// Use the object
System.out.println(stk);
latch.countDown();
});
// Subscribe
d.subscribe("updates");
// Wait for a message to come in
latch.await();
// Close the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// dotnet add package NATS.Net
using NATS.Net;
// NATS .NET has a built-in serializer that does the 'unsurprising' thing
// for most types. Most primitive types are serialized as expected.
// For any other type, JSON serialization is used. You can also provide
// your own serializers by implementing the INatsSerializer and
// INasSerializerRegistry interfaces. See also for more information:
// https://nats-io.github.io/nats.net/documentation/advanced/serialization.html
await using var nc = new NatsClient();
CancellationTokenSource cts = new();
// Subscribe for int, string, bytes, json
List<Task> tasks =
[
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<int>("x.int", cancellationToken: cts.Token))
{
Console.WriteLine($"Received int: {msg.Data}");
}
}),
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<string>("x.string", cancellationToken: cts.Token))
{
Console.WriteLine($"Received string: {msg.Data}");
}
}),
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<byte[]>("x.bytes", cancellationToken: cts.Token))
{
if (msg.Data != null)
{
Console.Write($"Received bytes: ");
foreach (var b in msg.Data)
{
Console.Write("0x{0:X2} ", b);
}
Console.WriteLine();
}
}
}),
Task.Run(async () =>
{
await foreach (var msg in nc.SubscribeAsync<MyData>("x.json", cancellationToken: cts.Token))
{
Console.WriteLine($"Received data: {msg.Data}");
}
}),
];
// Give the subscriber tasks some time to subscribe
await Task.Delay(1000);
await nc.PublishAsync<int>("x.int", 100);
await nc.PublishAsync<string>("x.string", "Hello, World!");
await nc.PublishAsync<byte[]>("x.bytes", new byte[] { 0x41, 0x42, 0x43 });
await nc.PublishAsync<MyData>("x.json", new MyData(30, "bar"));
await cts.CancelAsync();
await Task.WhenAll(tasks);
public record MyData(int Id, string Name);
// Output:
// Received int: 100
// Received bytes: 0x41 0x42 0x43
// Received string: Hello, World!
// Received data: MyData { Id = 30, Name = bar }
// See also for more information:
// https://nats-io.github.io/nats.net/documentation/advanced/serialization.html
require 'nats/client'
require 'json'
NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
nc.subscribe("updates") do |msg|
m = JSON.parse(msg)
# {"symbol"=>"GOOG", "price"=>12}
p m
end
end
// Structured data is not configurable in C NATS Client.