使用RedisMQ 做一次分布式改造
引言
熟悉TPL Dataflow博文的朋友可能记得这是个单体程序,使用TPL Dataflow 处理工作流任务, 在使用Docker部署的过程中, 有一个问题一直无法回避:
在单体程序部署的瞬间会有少量 流量无法处理;更糟糕的情况下,迭代部署的这个版本有问题,上线后无法运作, 更多的流量没有得到处理。
背负神圣使命(巨大压力)的程序猿心生一计, 为何不将单体程序改成分布式:服务A只接受数据,服务B只处理数据。
知识储备:
消息队列和订阅发布作为老生常谈的两个知识点被反复提及,按照JMS的规范, 官方称为点对点(point to point, queue) 和 订阅发布(publish/subscribe,topic ),
点对点:
消息生产者生产消息发送到queue中,然后消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者。
发布/订阅
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
这里要注意: 发布者将消息发布到通道中,而不用知道订阅者是谁(不关注是否存在订阅者);订阅者可收听自己感兴趣的多个通道, 也不需要知道发布者是谁(不关注是哪个发布者)。
故如果没有消费者,发布的消息将得不到处理;
头脑风暴
本次采用的消息队列模型:
- 解耦业务: 新建Receiver程序作为生产者,专注于接收并发送到队列;原有的webapp作为消费者专注数据处理。
- 起到削峰填谷的作用, 若建立多个消费者webapp容器,还能形成负载均衡的效果。
Redis 原生支持发布/订阅 模型,内置的List数据结构亦能形成轻量级MQ的效果。
需要关注Redis 两个命令( 左进右出,右进左出同理):
LPUSH & RPOP/BRPOP
Brpop 中的B 表示 “Block”, 是一个rpop命令的阻塞版本:若指定List没有新元素,在给定时间内,该命令会阻塞当前redis客户端连接,直到超时返回nil
编程实践
本次使用 AspNetCore 完成RedisMQ的实践。引入Redis国产第三方开源库 CSRedisCore.
不使用著名的StackExchange.Redis 组件库的原因:
之前一直使用StackExchange.Redis, 参考了很多资料,做了很多优化,并未完全解决 RedisTimeoutException 问题
StackExchange.Redis基于其多路复用机制,不支持阻塞式命令, 故采用了 CSRedisCore,该库强调了API 与Redis官方命令一致,很容易上手。
生产者Receiver:
------------------截取自Startup.cs------------------------------ public void ConfigureServices(IServiceCollection services) { var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")); RedisHelper.Initialization(csredis); services.AddMvc(); } ---------------------截取自数据接收Controller------------------- [Route("batch")] [HttpPost] public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs) { if (!ModelState.IsValid) throw new ArgumentException("Http Body Payload Error."); var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs); if (eqidPairs != null && eqidPairs.Any()) RedisHelper.LPush(redisKey, eqidPairs.ToArray()); await Task.CompletedTask; }
消费者webapp:
根据RedisMQ的事件推送方式,需要轮询Redis List 数据结构,这里使用AspNetCore内置的BackgroundService 实现了 后台轮询任务。
public class BackgroundJob : BackgroundService { private readonly IEqidPairHandler _eqidPairHandler; private readonly ILogger _logger; public BackgroundJob(IEqidPairHandler eqidPairHandler, ILoggerFactory loggerFactory) { _eqidPairHandler = eqidPairHandler; _logger = loggerFactory.CreateLogger(nameof(BackgroundJob)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Service starting"); while (!stoppingToken.IsCancellationRequested) { var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}"; var eqidpair = RedisHelper.BRPop(5, key); if (eqidpair != null) await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair)); else await Task.Delay(1000, stoppingToken); } _logger.LogInformation("Service stopping"); } }
var redis = new CSRedisClient[16]; //定义成单例 for (var a = 0; a < redis.Length; a++) redis[a] = new CSRedisClient(Configuration.GetConnectionString("redis") + ",defualtDatabase=" + a); services.AddSingleton<CSRedisClient[]>(redis); RedisHelper.Initialization(redis[0]);
注册CSRedisCore服务
最后依照引言中的部署原理图,将Nginx,Receiver, WebApp dockerize, 并且让 webapp 依赖于Nginx, Receiver
-------------------截取自docker-compose.yml文件---------------------- app: build: context: ./app dockerfile: Dockerfile expose: - "80" extra_hosts: - "dockerhost:172.18.0.1" environment: TZ: Asia/Shanghai volumes: - type: bind source: /mnt/eqidmanager/eqidlogs target: /app/eqidlogs - type: bind source: /mnt/eqidmanager/applogs target: /app/logs - type: bind source: /home/huangjun/eqidmanager/EqidManager.db target: /app/EqidManager.db healthcheck: test: ['CMD','curl','-f','http://localhost/healthcheck'] interval: 1m30s timeout: 10s retries: 3 depends_on: - receiver - proxy logging: options: max-size: "200k" max-file: "10" privileged: true
-
根据官方文档对于 depends_on 指令的说明,该指定决定了容器启动和停止的顺序,因此引言中需要 【暂存流量的】刚性需求可以得到满足
-
根据docker-compsoe up 命令的用法,若Receiver容器正在运行,且服务配置并未改变,该容器不会被停止。