.Net Core下使用RabbitMQ比较完备两种方案(虽然代码有点惨淡,不过我会完善)
一、前言
上篇说给大家来写C#和Java的方案,最近工作也比较忙,迟到了一些,我先给大家补上C#的方案。
二、使用的插件
HangFire
一个开源的.NET任务调度框架,最大特点在于内置提供集成化的控制台,方便后台查看及监控,支持多种存储方式;在方案中主要使用定时任务做补偿机制,后期可能会封装一些,能通过页面的形式直接添加任务;
NLog
日志记录框架,方案中使用记录日志,后期可能回集成多个日志框架;
Autofac
依赖注入的框架,应该不用做过多介绍;
SqlSugar
ORM框架,这个从刚开始我就在使用了,在现在公司没有推行起来,不过在上两家公司都留下的遗产,据说还用的可以,当然我还是最佩服作者;
Polly
容错服务框架,类似于Java下的Hystrix,主要是为了解决分布式系统中,系统之间相互依赖,可能会因为多种因素导致服务不可用的而产生的一套框架,支持服务的超时重试、限流、熔断器等等;
RabbitMQ.Client
官方提供的C#连接RabbitMQ的SDK;
三、方案
模拟一个简单订单下单的场景,没有进行具体的实现。同时建议下游服务不要写在web端,最好以服务的形式奔跑,代码中是Web端实现的,大家不要这么搞。整体上还是实现了之前提到的两种方案:一是入库打标,二是延时队列(这块没有进行很好的测试,但是估计也没有很大的问题);当然也是有一些特点:RabbitMQ宕机情况下无需重启服务,网络异常的情况下也可以进行断线重连。接下来聊下代码和各方插件在系统中的具体应用:
项目结构:
RabbitMQExtensions:
采用Autofac按照单例的形式注入,采用Polly进行断线重连,也开启了自身断线重连和心跳检测机制,配置方面采用最简单的URI规范进行配置,有兴趣参考下官方,整体上这块代码还相对比较规范,以后可能也不会有太多调整;
1 /// <summary> 2 /// rabbitmq持久化连接 3 /// </summary> 4 public interface IRabbitMQPersistentConnection 5 { 6 bool IsConnected { get; } 7 8 bool TryConnect(); 9 10 IModel CreateModel(); 11 } 12 /// <summary> 13 /// rabbitmq持久化连接实现 14 /// </summary> 15 public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection 16 { 17 private readonly IConnectionFactory connectionFactory; 18 private readonly ILogger<DefaultRabbitMQPersistentConnection> logger; 19 20 private IConnection connection; 21 22 private const int RETTRYCOUNT = 6; 23 24 private static readonly object lockObj = new object(); 25 public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger) 26 { 27 this.connectionFactory = connectionFactory; 28 this.logger = logger; 29 } 30 31 public bool IsConnected 32 { 33 get 34 { 35 return connection != null && connection.IsOpen; 36 } 37 } 38 39 public void Cleanup() 40 { 41 try 42 { 43 connection.Dispose(); 44 connection.Close(); 45 connection = null; 46 47 } 48 catch (IOException ex) 49 { 50 logger.LogCritical(ex.ToString()); 51 } 52 } 53 54 public IModel CreateModel() 55 { 56 if (!IsConnected) 57 { 58 connection.Close(); 59 throw new InvalidOperationException("连接不到rabbitmq"); 60 } 61 return connection.CreateModel(); 62 } 63 64 public bool TryConnect() 65 { 66 logger.LogInformation("RabbitMQ客户端尝试连接"); 67 68 lock (lockObj) 69 { 70 if (connection == null) 71 { 72 var policy = RetryPolicy.Handle<SocketException>() 73 .Or<BrokerUnreachableException>() 74 .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => 75 { 76 logger.LogWarning(ex.ToString()); 77 }); 78 79 policy.Execute(() => 80 { 81 connection = connectionFactory.CreateConnection(); 82 }); 83 } 84 85 86 87 if (IsConnected) 88 { 89 connection.ConnectionShutdown += OnConnectionShutdown; 90 connection.CallbackException += OnCallbackException; 91 connection.ConnectionBlocked += OnConnectionBlocked; 92 93 logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了连接"); 94 95 return true; 96 } 97 else 98 { 99 logger.LogCritical("无法创建和打开RabbitMQ连接"); 100 101 return false; 102 } 103 } 104 } 105 106 107 private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) 108 { 109 110 logger.LogWarning("RabbitMQ连接异常,尝试重连..."); 111 112 Cleanup(); 113 TryConnect(); 114 } 115 116 private void OnCallbackException(object sender, CallbackExceptionEventArgs e) 117 { 118 119 logger.LogWarning("RabbitMQ连接异常,尝试重连..."); 120 121 Cleanup(); 122 TryConnect(); 123 } 124 125 private void OnConnectionShutdown(object sender, ShutdownEventArgs reason) 126 { 127 128 logger.LogWarning("RabbitMQ连接异常,尝试重连..."); 129 130 Cleanup(); 131 TryConnect(); 132 } 133 }
View Code
OrderDal
SqlSugar的一些简单封装,有些小特点:大家可以可以通过配置来实现读写分离,采用仓储设计。如果不太喜欢这么写,也可以参考杰哥的做法
public interface IBaseDal<T> where T:class,new() { DbSqlSugarClient DbContext { get; } IBaseDal<T> UserDb(string dbName); IInsertable<T> AsInsertable(T t); IInsertable<T> AsInsertable(T[] t); IInsertable<T> AsInsertable(List<T> t); IUpdateable<T> AsUpdateable(T t); IUpdateable<T> AsUpdateable(T[] t); IUpdateable<T> AsUpdateable(List<T> t); IDeleteable<T> AsDeleteable(); List<T> GetList(); Task<List<T>> GetListAnsync(); List<T> GetList(Expression<Func<T,bool>> whereExpression); Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression); List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc); Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc); List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page); Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page); List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc); Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc); int Count(Expression<Func<T, bool>> whereExpression); Task<int> CountAsync(Expression<Func<T, bool>> whereExpression); T GetById(dynamic id); T GetSingle(Expression<Func<T, bool>> whereExpression); Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression); T GetFirst(Expression<Func<T, bool>> whereExpression); Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression); bool IsAny(Expression<Func<T, bool>> whereExpression); Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression); bool Insert(T t); Task<bool> InsertAsync(T t); bool InsertRange(List<T> t); Task<bool> InsertRangeAsync(List<T> t); bool InsertRange(T[] t); Task<bool> InsertRangeAsync(T[] t); int InsertReturnIdentity(T t); Task<long> InsertReturnIdentityAsync(T t); bool Delete(Expression<Func<T, bool>> whereExpression); Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression); bool Delete(T t); Task<bool> DeleteAsync(T t); bool DeleteById(dynamic id); Task<bool> DeleteByIdAsync(dynamic id); bool DeleteByIds(dynamic[] ids); Task<bool> DeleteByIdsAsync(dynamic[] ids); bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression); Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression); bool Update(T t); Task<bool> UpdateAsync(T t); bool UpdateRange(T[] t); Task<bool> UpdateRangeAsync(T[] t); void BeginTran(); void CommitTran(); void RollbackTran(); } public class BaseDal<T> : IBaseDal<T> where T : class, new() { private readonly IEnumerable<DbSqlSugarClient> clients; public BaseDal(IEnumerable<DbSqlSugarClient> clients) { this.clients = clients; DbContext = this.clients.FirstOrDefault(x => x.Default); } public DbSqlSugarClient DbContext { get; set; } public IDeleteable<T> AsDeleteable() { return DbContext.Deleteable<T>(); } public IInsertable<T> AsInsertable(T t) { return DbContext.Insertable<T>(t); } public IInsertable<T> AsInsertable(T[] t) { return DbContext.Insertable<T>(t); } public IInsertable<T> AsInsertable(List<T> t) { return DbContext.Insertable<T>(t); } public IUpdateable<T> AsUpdateable(T t) { return DbContext.Updateable<T>(t); } public IUpdateable<T> AsUpdateable(T[] t) { return DbContext.Updateable<T>(t); } public IUpdateable<T> AsUpdateable(List<T> t) { return DbContext.Updateable(t); } public void BeginTran() { DbContext.Ado.BeginTran(); } public void CommitTran() { DbContext.Ado.CommitTran(); } public int Count(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Count(whereExpression); } public Task<int> CountAsync(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().CountAsync(whereExpression); } public bool Delete(Expression<Func<T, bool>> whereExpression) { return DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommand() > 0; } public bool Delete(T t) { return DbContext.Deleteable<T>().ExecuteCommand() > 0; } public async Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommandAsync() > 0; } public async Task<bool> DeleteAsync(T t) { return await DbContext.Deleteable(t).ExecuteCommandAsync() > 0; } public bool DeleteById(dynamic id) { return DbContext.Deleteable<T>().In(id).ExecuteCommand() > 0; } public async Task<bool> DeleteByIdAsync(dynamic id) { return await DbContext.Deleteable<T>().In(id).ExecuteCommandAsync() > 0; } public bool DeleteByIds(dynamic[] ids) { return DbContext.Deleteable<T>().In(ids).ExecuteCommand() > 0; } public async Task<bool> DeleteByIdsAsync(dynamic[] ids) { return await DbContext.Deleteable<T>().In(ids).ExecuteCommandAsync() > 0; } public T GetById(dynamic id) { return DbContext.Queryable<T>().InSingle(id); } public T GetFirst(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().First(whereExpression); } public async Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().FirstAsync(whereExpression); } public List<T> GetList() { return DbContext.Queryable<T>().ToList(); } public List<T> GetList(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Where(whereExpression).ToList(); } public List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc) { return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToList(); } public async Task<List<T>> GetListAnsync() { return await DbContext.Queryable<T>().ToListAsync(); } public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().Where(whereExpression).ToListAsync(); } public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc) { return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToListAsync(); } public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page) { return DbContext.Queryable<T>().Where(whereExpression).ToPageList(page.PageIndex,page.PageSize); } public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc) { return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageList(page.PageIndex, page.PageSize); } public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page) { return await DbContext.Queryable<T>().Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize); } public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc) { return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize); } public T GetSingle(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Single(whereExpression); } public async Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().SingleAsync(whereExpression); } public bool Insert(T t) { return DbContext.Insertable(t).ExecuteCommand() > 0; } public async Task<bool> InsertAsync(T t) { return await DbContext.Insertable(t).ExecuteCommandAsync() > 0; } public bool InsertRange(List<T> t) { return DbContext.Insertable(t).ExecuteCommand() > 0; } public bool InsertRange(T[] t) { return DbContext.Insertable(t).ExecuteCommand() > 0; } public async Task<bool> InsertRangeAsync(List<T> t) { return await DbContext.Insertable(t).ExecuteCommandAsync() > 0; } public async Task<bool> InsertRangeAsync(T[] t) { return await DbContext.Insertable(t).ExecuteCommandAsync() > 0; } public int InsertReturnIdentity(T t) { return DbContext.Insertable(t).ExecuteReturnIdentity(); } public async Task<long> InsertReturnIdentityAsync(T t) { return await DbContext.Insertable(t).ExecuteReturnBigIdentityAsync(); } public bool IsAny(Expression<Func<T, bool>> whereExpression) { return DbContext.Queryable<T>().Any(whereExpression); } public async Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression) { return await DbContext.Queryable<T>().AnyAsync(whereExpression); } public void RollbackTran() { DbContext.Ado.RollbackTran(); } public bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression) { return DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommand() > 0; } public bool Update(T t) { return DbContext.Updateable(t).ExecuteCommand() > 0; } public async Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression) { return await DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommandAsync() > 0; } public async Task<bool> UpdateAsync(T t) { return await DbContext.Updateable(t).ExecuteCommandAsync() > 0; } public bool UpdateRange(T[] t) { return DbContext.Updateable(t).ExecuteCommand() > 0; } public async Task<bool> UpdateRangeAsync(T[] t) { return await DbContext.Updateable(t).ExecuteCommandAsync() > 0; } public IBaseDal<T> UserDb(string dbName) { DbContext = this.clients.FirstOrDefault(it => it.DbName == dbName); return this; } }
View Code
OrderCommon
定义全局异常的中间件,还有包含一些用到的实体等等,这部分代码还可优化拆分一下;
OrderService
生产者和消费者的具体实现,这块我还想在改造一番,将消费和业务分割开,现在写的很凌乱,不建议这么写,先把代码放出来,看看大家赞同不赞同我的这些用法,可以讨论,也欢迎争论,虽然这块代码写的不好,但是其实里面涉及一些RabbitMQ回调函数的用法,也是比较重要的,没有这些函数也就实现不了我上面说那两个特点;
//RabbitMQ宕机以后回调 //客户端这块大家不要采用递归调用恢复链接 //具体为什么大家可以测试下,这里留点小疑问哈哈 connection.ConnectionShutdown += OnConnectionShutdown; //消费端异常以后回调 consumerchannel.CallbackException += OnOnConsumerMessageAndWriteMessageLogException;
Order
具体的调用者,大家应该根据方法名字就能区分出我上面提到的两种方案的设计;
HangfireExtensions
Hangfire定时框架,采用Mysql作为持久层的存储,写的也比较清晰,后期就是针对这些进行扩展,实现在界面就能添加定时任务;
四、结束
生产端和消费端这段代码写的凌乱,希望大家不要介意这一点,是有原因的,这里我就不说了。希望大家看到闪光点,不要在一点上纠结;下次会加入Elasticsearch和监控部分的时候我会把这块代码改掉,还大家一片整洁的世界;
Github地址:https://github.com/wangtongzhou520/rabbitmq.git 有什么问题大家可以问我;
欢迎大家加群438836709!欢迎大家关注我!