这是一篇拖更很久的博客,不知不觉InitQ在nuget下载量已经过15K了,奈何胸无点墨也不晓得怎么写(懒),随便在github上挂了个md,现在好好唠唠如何在redis里使用队列
image
队列缓存分布式 异步调优堆配置 ——(来自某位不知名码友)

redis在项目中使用的越来越频繁,通常我们是用来做缓存,使用较多的就是String,Hash这两种类型,以及分布式锁,redis的List类型,就可以用于消息队列,使用起来更加简单,且速度更快,非常适合子服务内部之间的消息流转,创造灵感来自于杨老板的CAP(地址:https://www.cnblogs.com/tibos/p/11858095.html),采用注解的方式消费队列,让业务逻辑更加的清晰,方便维护

  • .net core版本:2.1
  • redis版本:3.0以上
  1. 1.通过注解的方式,订阅队列
  2. 2.可以设置消费消息的频次
  3. 3.支持消息广播
  4. 4.支持延迟队列
  • 1.获取initQ包

    方案A. install-package InitQ
    方案B. nuget包管理工具搜索 InitQ

  • 2.添加中间件(该中间件依赖 StackExchange.Redis)

    1. services.AddInitQ(m=>
    2. {
    3. m.SuspendTime = 1000;
    4. m.IntervalTime = 1000;
    5. m.ConnectionString = "127.0.0.1,connectTimeout=15000,syncTimeout=5000,password=123456";
    6. m.ListSubscribe = new List<Type>() { typeof(RedisSubscribeA), typeof(RedisSubscribeB) };
    7. m.ShowLog = false;
    8. });
  • 3.配置说明

    1. public class InitQOptions
    2. {
    3. /// <summary>
    4. /// redis连接字符串
    5. /// </summary>
    6. public string ConnectionString { get; set; }
    7. /// <summary>
    8. /// 没消息时挂起时长(毫秒)
    9. /// </summary>
    10. public int SuspendTime { get; set; }
    11. /// <summary>
    12. /// 每次消费消息间隔时间(毫秒)
    13. /// </summary>
    14. public int IntervalTime { get; set; }
    15. /// <summary>
    16. /// 是否显示日志
    17. /// </summary>
    18. public bool ShowLog { get; set; }
    19. /// <summary>
    20. /// 需要注入的类型
    21. /// </summary>
    22. public IList<Type> ListSubscribe { get; set; }
    23. public InitQOptions()
    24. {
    25. ConnectionString = "";
    26. IntervalTime = 0;
    27. SuspendTime = 1000;
    28. ShowLog = false;
    29. }
    30. }

消息的发布/订阅是最基础的功能,这里做了几个优化

  1. 采用的是长轮询模式,可以控制消息消费的频次,以及轮询空消息的间隔,避免资源浪费
  2. 支持多个类订阅消息,可以很方便的根据业务进行分类,前提是这些类 必须注册
  3. 支持多线程消费消息(在执行耗时任务的时候,非常有用)

示例如下(Thread.Sleep):

  1. public class RedisSubscribeA: IRedisSubscribe
  2. {
  3. [Subscribe("tibos_test_1")]
  4. private async Task SubRedisTest(string msg)
  5. {
  6. Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
  7. Thread.Sleep(3000); //使用堵塞线程模式,同步延时
  8. Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
  9. }
  10. }

image

  1. public class RedisSubscribeA: IRedisSubscribe
  2. {
  3. [Subscribe("tibos_test_1")]
  4. private async Task SubRedisTest(string msg)
  5. {
  6. Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
  7. Thread.Sleep(3000); //使用堵塞线程模式,同步延时
  8. Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
  9. }
  10. [Subscribe("tibos_test_1")]
  11. private async Task SubRedisTest2(string msg)
  12. {
  13. Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
  14. Thread.Sleep(3000); //使用堵塞线程模式,同步延时
  15. Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
  16. }
  17. }

image

示例如下(Task.Delay):

  1. [Subscribe("tibos_test_1")]
  2. private async Task SubRedisTest(string msg)
  3. {
  4. Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
  5. await Task.Delay(3000); //使用非堵塞线程模式,异步延时
  6. Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
  7. }

image

根据业务情况,合理的选择堵塞模式

  • 1.订阅发布者
    1. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
    2. {
    3. //redis对象
    4. var _redis = scope.ServiceProvider.GetService<ICacheService>();
    5. //循环向 tibos_test_1 队列发送消息
    6. for (int i = 0; i < 1000; i++)
    7. {
    8. await _redis.ListRightPushAsync("tibos_test_1", $"我是消息{i + 1}号");
    9. }
    10. }
  • 2.定义消费者类 RedisSubscribeA
    1. public class RedisSubscribeA: IRedisSubscribe
    2. {
    3. [Subscribe("tibos_test_1")]
    4. private async Task SubRedisTest(string msg)
    5. {
    6. Console.WriteLine($"A类--->订阅者A消息消息:{msg}");
    7. }
    8. [Subscribe("tibos_test_1")]
    9. private async Task SubRedisTest1(string msg)
    10. {
    11. Console.WriteLine($"A类--->订阅者A1消息消息:{msg}");
    12. }
    13. [Subscribe("tibos_test_1")]
    14. private async Task SubRedisTest2(string msg)
    15. {
    16. Console.WriteLine($"A类--->订阅者A2消息消息:{msg}");
    17. }
    18. [Subscribe("tibos_test_1")]
    19. private async Task SubRedisTest3(string msg)
    20. {
    21. Console.WriteLine($"A类--->订阅者A3消息消息:{msg}");
    22. }
    23. }
  • 3.定义消费者类 RedisSubscribeB
    1. public class RedisSubscribeB : IRedisSubscribe
    2. {
    3. /// <summary>
    4. /// 测试
    5. /// </summary>
    6. /// <param name="msg"></param>
    7. /// <returns></returns>
    8. [Subscribe("tibos_test_1")]
    9. private async Task SubRedisTest(string msg)
    10. {
    11. Console.WriteLine($"B类--->订阅者B消费消息:{msg}");
    12. }
    13. }

消息广播是StackExchange.Redis已经封装好的,我们只用起个线程监听即可,只要监听了这个key的线程,都会收到消息

  • 1.订阅消息通道,订阅者需要在程序初始化的时候启动一个线程侦听通道,这里使用HostedService来实现,并注册到容器
    1. public class ChannelSubscribeA : IHostedService, IDisposable
    2. {
    3. private readonly IServiceProvider _provider;
    4. private readonly ILogger _logger;
    5. public ChannelSubscribeA(ILogger<TestMain> logger, IServiceProvider provider)
    6. {
    7. _logger = logger;
    8. _provider = provider;
    9. }
    10. public void Dispose()
    11. {
    12. _logger.LogInformation("退出");
    13. }
    14. public Task StartAsync(CancellationToken cancellationToken)
    15. {
    16. _logger.LogInformation("程序启动");
    17. Task.Run(async () =>
    18. {
    19. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
    20. {
    21. //redis对象
    22. var _redis = scope.ServiceProvider.GetService<ICacheService>();
    23. await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) =>
    24. {
    25. Console.WriteLine("test_channel" + " 订阅服务A收到消息:" + message);
    26. }));
    27. }
    28. });
    29. return Task.CompletedTask;
    30. }
    31. public Task StopAsync(CancellationToken cancellationToken)
    32. {
    33. _logger.LogInformation("结束");
    34. return Task.CompletedTask;
    35. }
    36. }
    1. public class ChannelSubscribeB : IHostedService, IDisposable
    2. {
    3. private readonly IServiceProvider _provider;
    4. private readonly ILogger _logger;
    5. public ChannelSubscribeB(ILogger<TestMain> logger, IServiceProvider provider)
    6. {
    7. _logger = logger;
    8. _provider = provider;
    9. }
    10. public void Dispose()
    11. {
    12. _logger.LogInformation("退出");
    13. }
    14. public Task StartAsync(CancellationToken cancellationToken)
    15. {
    16. _logger.LogInformation("程序启动");
    17. Task.Run(async () =>
    18. {
    19. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
    20. {
    21. //redis对象
    22. var _redis = scope.ServiceProvider.GetService<ICacheService>();
    23. await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) =>
    24. {
    25. Console.WriteLine("test_channel" + " 订阅服务B收到消息:" + message);
    26. }));
    27. }
    28. });
    29. return Task.CompletedTask;
    30. }
    31. public Task StopAsync(CancellationToken cancellationToken)
    32. {
    33. _logger.LogInformation("结束");
    34. return Task.CompletedTask;
    35. }
    36. }
  • 2.将HostedService类注入到容器
    1. services.AddHostedService<ChannelSubscribeA>();
    2. services.AddHostedService<ChannelSubscribeB>();
  • 3.广播消息
    1. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
    2. {
    3. //redis对象
    4. var _redis = scope.ServiceProvider.GetService<ICacheService>();
    5. for (int i = 0; i < 1000; i++)
    6. {
    7. await _redis.PublishAsync("test_channel", $"往通道发送第{i}条消息");
    8. }
    9. }

延迟消息非常适用处理一些定时任务的场景,如订单15分钟未付款,自动取消, xxx天后,自动续费…… 这里使用zset+redis锁来实现,这里的操作方式,跟发布/定义非常类似
写入延迟消息:SortedSetAddAsync
注解使用:SubscribeDelay

  • 1.定义发布者

    1. Task.Run(async () =>
    2. {
    3. using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
    4. {
    5. //redis对象
    6. var _redis = scope.ServiceProvider.GetService<ICacheService>();
    7. for (int i = 0; i < 100; i++)
    8. {
    9. var dt = DateTime.Now.AddSeconds(3 * (i + 1));
    10. //key:redis里的key,唯一
    11. //msg:任务
    12. //time:延时执行的时间
    13. await _redis.SortedSetAddAsync("test_0625", $"延迟任务,第{i + 1}个元素,执行时间:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt);
    14. }
    15. }
    16. });
  • 2.定义消费者

    1. //延迟队列
    2. [SubscribeDelay("test_0625")]
    3. private async Task SubRedisTest1(string msg)
    4. {
    5. Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者延迟队列消息开始--->{msg}");
    6. //模拟任务执行耗时
    7. await Task.Delay(TimeSpan.FromSeconds(3));
    8. Console.WriteLine($"A类--->{msg} 结束<---");
    9. }

image

版本

  • V1.0 更新时间:2019-12-30

版本库:

作者:提伯斯

版权声明:本文为tibos原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/tibos/p/14944832.html