[C#/NATS] nats streaming server

C# 2023. 7. 10. 00:22
728x90
728x170

Nats 를 이용해서 Streaming Server 구성하는 방법입니다.

아래 사이트로 가서 nats-streaming-server.exe 파일을 다운받아 실행합니다.

https://github.com/nats-io/nats-streaming-server/releases

 

Releases · nats-io/nats-streaming-server

NATS Streaming System Server. Contribute to nats-io/nats-streaming-server development by creating an account on GitHub.

github.com

nats-streaming-server-v0.25.5-windows-amd64.zip

nats-streaming-server.exe 실행

위 로그로 4222 포트로 실행된걸 확인합니다.

이제 Pub/Sub 프로젝트를 구성합니다.

아래는 클라이언트(Pub)에서 메세지를 전달하고 서버(Sub)에서는 전달한 메세지를 받는 예제입니다.

 공통적으로 'STAN.Client' Nuget Package 를 설치합니다.

각 프로젝트에는 아래와 같이 코딩합니다.

- Nats.StreamTest.Sub

Program.cs

using System.Text;

using STAN.Client;

namespace Nats.StreamTest.Sub
{
    internal class Program
    {
        static void Main(string[] args)
        {
            string clientId = "ClientID" + Guid.NewGuid().ToString().Replace("-", "");

            var cf = new StanConnectionFactory();

            StanOptions options = StanOptions.GetDefaultOptions();
            options.NatsURL = "nats://localhost:4222";

            using (var c = cf.CreateConnection("test-cluster", clientId, options))
            {
                var opts = StanSubscriptionOptions.GetDefaultOptions();

                // 추가 옵션
                //opts.DeliverAllAvailable();
                //opts.StartAt(15);
                //opts.StartAt(TimeSpan.FromSeconds(10));
                //opts.StartAt(new DateTime(2019, 9, 18, 9, 0, 0));
                //opts.StartWithLastReceived();
                //opts.DurableName = "durable";

                var s = c.Subscribe("nats.streaming.channel", opts, (obj, args) =>
                {
                    // 메세지 수신
                    string message = Encoding.UTF8.GetString(args.Message.Data);
                    Console.WriteLine($"[#{args.Message.Sequence}] {message}");
                });

                Console.WriteLine($"Subcribe - client id '{clientId}'");
                Console.ReadKey(true);

                //s.Unsubscribe();
                c.Close();
            }
        }
    }
}

위 코드를 보면 알수 있듯이 메세지는 "nats.streaming.channel" 채널을 통해 수신처리가 되어있음을 알수 있습니다.

- Nats.StreamTest.Pub

Program.cs

using System.Text;

using STAN.Client;

namespace Nats.StreamTest.Pub
{
    internal class Program
    {
        static void Main(string[] args)
        {
            string clientId = $"pub-{Guid.NewGuid().ToString()}";

            var cf = new StanConnectionFactory();
            StanOptions options = StanOptions.GetDefaultOptions();
            options.NatsURL = "nats://localhost:4222";

            using (var c = cf.CreateConnection("test-cluster", clientId, options))
            {
                for (int i = 1; i <= 25; i++)
                {
                    string message = $"[{DateTime.Now.ToString("hh:mm:ss:fffffff")}] Message {i}";
                    Console.WriteLine($"Pubish - Sending {message}");

                    c.Publish("nats.streaming.channel", Encoding.UTF8.GetBytes(message));
                }
            }

            Console.ReadKey(true);
        }
    }
}

"nats.streaming.channel" 채널을 통해 메세지를 전달하고 있으며 단건이 아닌 여러건을 보내고 있습니다.


결과

체널을 구독하면 위와 같은 메세지가 찍히게 됩니다.

Pub -> Sub


참고
https://github.com/nats-io/nats-streaming-server

 

GitHub - nats-io/nats-streaming-server: NATS Streaming System Server

NATS Streaming System Server. Contribute to nats-io/nats-streaming-server development by creating an account on GitHub.

github.com

https://github.com/EdwinVW/nats-demos/tree/master/src/NATS-Streaming

 

728x90
그리드형
Posted by kjun
,