本地Docker部署Pulsar消息代理实现消息发布和消息订阅

 

Apache Pulsar 介绍

 

相关概念,后面有时间再花时间整理下。

 

1.使用dokcer本地部署pulsar

  1. docker run -it \
  2. -p 6650:6650 \
  3. -p 8080:8080 \
  4. --mount source=pulsardata,target=/pulsar/data \
  5. --mount source=pulsarconf,target=/pulsar/conf \
  6. apachepulsar/pulsar:2.7.1 \
  7. bin/pulsar standalone

  

2.docker ps -a 查看pulsar运行是否正常,可以看到下图已经部署成功

pulsar连接地址:http://localhost:8080

         pulsar://localhost:6650

3.使用C#客户端Publish Message到pulsar broker中

(1)为了演示,我这里创建了一个C#控制台项目

 

(2)我们使用官网推荐的C# pulsar客户端包,添加安装DotPulsar nuget包

(3)创建client

  1. //1。创建pulsar客户端
  2. var client = PulsarClient.Builder()
  3. .ServiceUrl(new Uri("pulsar://localhost:6650"))
  4. .RetryInterval(new TimeSpan(3))
  5. .Build();

 

(4)创建生产者,发送消息

  1. //2、创建Pulsar Producer(生产者)
  2. var producer = client.NewProducer()
  3. .Topic("persistent://public/default/mytopic")
  4. .Create();
  5. var data = Encoding.UTF8.GetBytes("Hello Pulsar");
  6. await producer.Send(data);

上图可见显示创建producer成功。

(5)下面再创建一个客户端来消费发送者发送的消息(“Hello Pulsar”)。

  1. //2、创建Pulsar Producer(消费者)
  2. var consumer = client.NewConsumer()
  3. .SubscriptionName("MySubscription")
  4. .Topic("persistent://public/default/mytopic")
  5. .Create();
  6. //3.消费消息
  7. await foreach (var message in consumer.Messages())
  8. {
  9. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  10. }

见上图,发布者发送消息成功被订阅者消费。

4.代码示例

  1. //PublisherClient
  2. static async Task Main(string[] args)
  3. {
  4. Console.WriteLine("Hello Pulsar");
  5. //1。创建pulsar客户端
  6. var client = PulsarClient.Builder()
  7. .ServiceUrl(new Uri("pulsar://localhost:6650"))
  8. .RetryInterval(new TimeSpan(3))
  9. .Build();
  10. //2、创建Pulsar Producer(生产者)
  11. var producer = client.NewProducer()
  12. .Topic("persistent://public/default/mytopic")
  13. .Create();
  14. for (int i = 0; i < 5; i++)
  15. {
  16. var data = Encoding.UTF8.GetBytes($"Hello Pulsar {i}");
  17. await producer.Send(data);
  18. Console.WriteLine($"发送消息成功");
  19. }
  20. Console.ReadKey();
  21. }
  22. //SubscriberClient
  23. static async Task Main(string[] args)
  24. {
  25. //1。创建pulsar客户端
  26. var client = PulsarClient.Builder()
  27. .ServiceUrl(new Uri("pulsar://localhost:6650"))
  28. .RetryInterval(new TimeSpan(3))
  29. .Build();
  30. //2、创建Pulsar Producer(消费者)
  31. var consumer = client.NewConsumer()
  32. .SubscriptionName("MySubscription")
  33. .Topic("persistent://public/default/mytopic")
  34. .Create();
  35. //3.消费消息
  36. await foreach (var message in consumer.Messages())
  37. {
  38. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  39. }
  40. Console.ReadKey();
  41. }

版权声明:本文为jaciots原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/jaciots/p/14770054.html