一、前言

    上篇说给大家来写C#和Java的方案,最近工作也比较忙,迟到了一些,我先给大家补上C#的方案。

二、使用的插件

    HangFire

    一个开源的.NET任务调度框架,最大特点在于内置提供集成化的控制台,方便后台查看及监控,支持多种存储方式;在方案中主要使用定时任务做补偿机制,后期可能会封装一些,能通过页面的形式直接添加任务;

   NLog

   日志记录框架,方案中使用记录日志,后期可能回集成多个日志框架;

   Autofac

   依赖注入的框架,应该不用做过多介绍;

  SqlSugar

  ORM框架,这个从刚开始我就在使用了,在现在公司没有推行起来,不过在上两家公司都留下的遗产,据说还用的可以,当然我还是最佩服作者;

  Polly

  容错服务框架,类似于Java下的Hystrix,主要是为了解决分布式系统中,系统之间相互依赖,可能会因为多种因素导致服务不可用的而产生的一套框架,支持服务的超时重试限流熔断器等等;

  RabbitMQ.Client

  官方提供的C#连接RabbitMQ的SDK;

三、方案

  模拟一个简单订单下单的场景,没有进行具体的实现。同时建议下游服务不要写在web端,最好以服务的形式奔跑,代码中是Web端实现的,大家不要这么搞。整体上还是实现了之前提到的两种方案:一是入库打标,二是延时队列(这块没有进行很好的测试,但是估计也没有很大的问题);当然也是有一些特点:RabbitMQ宕机情况下无需重启服务,网络异常的情况下也可以进行断线重连。接下来聊下代码和各方插件在系统中的具体应用:

  项目结构:

  

  RabbitMQExtensions:

  采用Autofac按照单例的形式注入,采用Polly进行断线重连,也开启了自身断线重连和心跳检测机制,配置方面采用最简单的URI规范进行配置,有兴趣参考下官方,整体上这块代码还相对比较规范,以后可能也不会有太多调整;

  1     /// <summary>
  2     /// rabbitmq持久化连接
  3     /// </summary>
  4     public interface IRabbitMQPersistentConnection
  5     {
  6         bool IsConnected { get; }
  7 
  8         bool TryConnect();
  9 
 10         IModel CreateModel();
 11     }
 12     /// <summary>
 13     /// rabbitmq持久化连接实现
 14     /// </summary>
 15     public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection
 16     {
 17         private readonly IConnectionFactory connectionFactory;
 18         private readonly ILogger<DefaultRabbitMQPersistentConnection> logger;
 19 
 20         private IConnection connection;
 21 
 22         private const int RETTRYCOUNT = 6;
 23 
 24         private static readonly object lockObj = new object();
 25         public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger)
 26         {
 27             this.connectionFactory = connectionFactory;
 28             this.logger = logger;
 29         }
 30 
 31         public bool IsConnected
 32         {
 33             get
 34             {
 35                 return connection != null && connection.IsOpen;
 36             }
 37         }
 38 
 39         public void Cleanup()
 40         {
 41             try
 42             {
 43                 connection.Dispose();
 44                 connection.Close();
 45                 connection = null;
 46 
 47             }
 48             catch (IOException ex)
 49             {
 50                 logger.LogCritical(ex.ToString());
 51             }
 52         }
 53 
 54         public IModel CreateModel()
 55         {
 56             if (!IsConnected)
 57             {
 58                 connection.Close();
 59                 throw new InvalidOperationException("连接不到rabbitmq");
 60             }
 61             return connection.CreateModel();
 62         }
 63 
 64         public bool TryConnect()
 65         {
 66             logger.LogInformation("RabbitMQ客户端尝试连接");
 67 
 68             lock (lockObj)
 69             {
 70                 if (connection == null)
 71                 {
 72                     var policy = RetryPolicy.Handle<SocketException>()
 73                         .Or<BrokerUnreachableException>()
 74                         .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>    
 75                         {
 76                             logger.LogWarning(ex.ToString());
 77                         });
 78 
 79                     policy.Execute(() =>
 80                     {
 81                         connection = connectionFactory.CreateConnection();
 82                     });
 83                 }
 84 
 85 
 86 
 87                 if (IsConnected)
 88                 {
 89                     connection.ConnectionShutdown += OnConnectionShutdown;
 90                     connection.CallbackException += OnCallbackException;
 91                     connection.ConnectionBlocked += OnConnectionBlocked;
 92 
 93                     logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了连接");
 94 
 95                     return true;
 96                 }
 97                 else
 98                 {
 99                     logger.LogCritical("无法创建和打开RabbitMQ连接");
100 
101                     return false;
102                 }
103             }
104         }
105 
106 
107         private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
108         {
109 
110             logger.LogWarning("RabbitMQ连接异常,尝试重连...");
111 
112             Cleanup();
113             TryConnect();
114         }
115 
116         private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
117         {
118 
119             logger.LogWarning("RabbitMQ连接异常,尝试重连...");
120 
121             Cleanup();
122             TryConnect();
123         }
124 
125         private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
126         {
127 
128             logger.LogWarning("RabbitMQ连接异常,尝试重连...");
129 
130             Cleanup();
131             TryConnect();
132         }
133     }

View Code

  OrderDal

  SqlSugar的一些简单封装,有些小特点:大家可以可以通过配置来实现读写分离,采用仓储设计。如果不太喜欢这么写,也可以参考杰哥的做法

    public interface IBaseDal<T> where T:class,new()
    {
        DbSqlSugarClient DbContext { get; }

        IBaseDal<T> UserDb(string dbName);
        IInsertable<T> AsInsertable(T t);
        IInsertable<T> AsInsertable(T[] t);
        IInsertable<T> AsInsertable(List<T> t);
        IUpdateable<T> AsUpdateable(T t);
        IUpdateable<T> AsUpdateable(T[] t);
        IUpdateable<T> AsUpdateable(List<T> t);
        IDeleteable<T> AsDeleteable();

        List<T> GetList();
        Task<List<T>> GetListAnsync();

        List<T> GetList(Expression<Func<T,bool>> whereExpression);
        Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression);

        List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc);
        Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc);

        List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page);
        Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page);

        List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc);
        Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc);

        int Count(Expression<Func<T, bool>> whereExpression);
        Task<int> CountAsync(Expression<Func<T, bool>> whereExpression);
        T GetById(dynamic id);
        T GetSingle(Expression<Func<T, bool>> whereExpression);
        Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression);
        T GetFirst(Expression<Func<T, bool>> whereExpression);
        Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression);

        bool IsAny(Expression<Func<T, bool>> whereExpression);
        Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression);

        bool Insert(T t);
        Task<bool> InsertAsync(T t);
        bool InsertRange(List<T> t);
        Task<bool> InsertRangeAsync(List<T> t);
        bool InsertRange(T[] t);
        Task<bool> InsertRangeAsync(T[] t);
        int InsertReturnIdentity(T t);
        Task<long> InsertReturnIdentityAsync(T t);


        bool Delete(Expression<Func<T, bool>> whereExpression);
        Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression);
        bool Delete(T t);
        Task<bool> DeleteAsync(T t);
        bool DeleteById(dynamic id);
        Task<bool> DeleteByIdAsync(dynamic id);
        bool DeleteByIds(dynamic[] ids);
        Task<bool> DeleteByIdsAsync(dynamic[] ids);


        bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression);
        Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression);
        bool Update(T t);
        Task<bool> UpdateAsync(T t);
        bool UpdateRange(T[] t);
        Task<bool> UpdateRangeAsync(T[] t);


        void BeginTran();
        void CommitTran();
        void RollbackTran();
    }

   public class BaseDal<T> : IBaseDal<T> where T : class, new()
    {
        private readonly IEnumerable<DbSqlSugarClient> clients;
        public BaseDal(IEnumerable<DbSqlSugarClient> clients)
        {
            this.clients = clients;
            DbContext = this.clients.FirstOrDefault(x => x.Default);
        }
        public DbSqlSugarClient DbContext { get; set; }

        public IDeleteable<T> AsDeleteable()
        {
            return DbContext.Deleteable<T>();
        }

        public IInsertable<T> AsInsertable(T t)
        {
            return DbContext.Insertable<T>(t);
        }

        public IInsertable<T> AsInsertable(T[] t)
        {
            return DbContext.Insertable<T>(t);
        }

        public IInsertable<T> AsInsertable(List<T> t)
        {
            return DbContext.Insertable<T>(t);
        }

        public IUpdateable<T> AsUpdateable(T t)
        {
            return DbContext.Updateable<T>(t);
        }

        public IUpdateable<T> AsUpdateable(T[] t)
        {
            return DbContext.Updateable<T>(t);
        }

        public IUpdateable<T> AsUpdateable(List<T> t)
        {
            return DbContext.Updateable(t);
        }

        public void BeginTran()
        {
            DbContext.Ado.BeginTran();
        }

        public void CommitTran()
        {
            DbContext.Ado.CommitTran();
        }

        public int Count(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Count(whereExpression);
        }

        public Task<int> CountAsync(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().CountAsync(whereExpression);
        }

        public bool Delete(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommand() > 0;
        }

        public bool Delete(T t)
        {
            return DbContext.Deleteable<T>().ExecuteCommand() > 0;
        }

        public async Task<bool> DeleteAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Deleteable<T>().Where(whereExpression).ExecuteCommandAsync() > 0;
        }

        public async Task<bool> DeleteAsync(T t)
        {
            return await DbContext.Deleteable(t).ExecuteCommandAsync() > 0;
        }

        public bool DeleteById(dynamic id)
        {
            return DbContext.Deleteable<T>().In(id).ExecuteCommand() > 0;
        }

        public async Task<bool> DeleteByIdAsync(dynamic id)
        {
            return await DbContext.Deleteable<T>().In(id).ExecuteCommandAsync() > 0;
        }

        public bool DeleteByIds(dynamic[] ids)
        {
            return DbContext.Deleteable<T>().In(ids).ExecuteCommand() > 0;
        }

        public async Task<bool> DeleteByIdsAsync(dynamic[] ids)
        {
            return await DbContext.Deleteable<T>().In(ids).ExecuteCommandAsync() > 0;
        }

        public T GetById(dynamic id)
        {
            return DbContext.Queryable<T>().InSingle(id);
        }

        public T GetFirst(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().First(whereExpression);
        }

        public async Task<T> GetFirstAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().FirstAsync(whereExpression);
        }

        public List<T> GetList()
        {
            return DbContext.Queryable<T>().ToList();
        }

        public List<T> GetList(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Where(whereExpression).ToList();
        }

        public List<T> GetList(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc)
        {
            return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToList();
        }

        public async Task<List<T>> GetListAnsync()
        {
            return await DbContext.Queryable<T>().ToListAsync();
        }

        public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).ToListAsync();
        }

        public async Task<List<T>> GetListAnsync(Expression<Func<T, bool>> whereExpression, Expression<Func<T, object>> orderExpression, OrderByType orderByType = OrderByType.Desc)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderExpression != null, orderExpression, orderByType).Where(whereExpression).ToListAsync();
        }

        public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page)
        {
            return DbContext.Queryable<T>().Where(whereExpression).ToPageList(page.PageIndex,page.PageSize);
        }

        public List<T> GetPageList(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc)
        {
           return DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageList(page.PageIndex, page.PageSize);
        }

        public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize);
        }

        public async Task<List<T>> GetPageListAsync(Expression<Func<T, bool>> whereExpression, PageModel page, Expression<Func<T, object>> orderByExpression = null, OrderByType orderByType = OrderByType.Asc)
        {
            return await DbContext.Queryable<T>().Where(whereExpression).OrderByIF(orderByExpression != null, orderByExpression, orderByType).Where(whereExpression).ToPageListAsync(page.PageIndex, page.PageSize);
        }

        public T GetSingle(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Single(whereExpression);
        }

        public async Task<T> GetSingleAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().SingleAsync(whereExpression);
        }

        public bool Insert(T t)
        {
            return DbContext.Insertable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> InsertAsync(T t)
        {
            return await DbContext.Insertable(t).ExecuteCommandAsync() > 0;
        }

        public bool InsertRange(List<T> t)
        {
            return DbContext.Insertable(t).ExecuteCommand() > 0;
        }

        public bool InsertRange(T[] t)
        {
            return DbContext.Insertable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> InsertRangeAsync(List<T> t)
        {
            return await DbContext.Insertable(t).ExecuteCommandAsync() > 0;
        }

        public async Task<bool> InsertRangeAsync(T[] t)
        {
            return await DbContext.Insertable(t).ExecuteCommandAsync() > 0;
        }

        public int InsertReturnIdentity(T t)
        {
            return DbContext.Insertable(t).ExecuteReturnIdentity();
        }

        public async Task<long> InsertReturnIdentityAsync(T t)
        {
            return await DbContext.Insertable(t).ExecuteReturnBigIdentityAsync();
        }

        public bool IsAny(Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Queryable<T>().Any(whereExpression);
        }

        public async Task<bool> IsAnyAsync(Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Queryable<T>().AnyAsync(whereExpression);
        }

        public void RollbackTran()
        {
            DbContext.Ado.RollbackTran();
        }

        public bool Update(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression)
        {
            return DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommand() > 0;
        }

        public bool Update(T t)
        {
            return DbContext.Updateable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> UpdateAsync(Expression<Func<T, T>> columns, Expression<Func<T, bool>> whereExpression)
        {
            return await DbContext.Updateable<T>().UpdateColumns(columns).Where(whereExpression).ExecuteCommandAsync() > 0;
        }

        public async Task<bool> UpdateAsync(T t)
        {
            return await DbContext.Updateable(t).ExecuteCommandAsync() > 0;
        }

        public bool UpdateRange(T[] t)
        {
            return DbContext.Updateable(t).ExecuteCommand() > 0;
        }

        public async Task<bool> UpdateRangeAsync(T[] t)
        {
            return await DbContext.Updateable(t).ExecuteCommandAsync() > 0;
        }

        public IBaseDal<T> UserDb(string dbName)
        {
            DbContext = this.clients.FirstOrDefault(it => it.DbName == dbName);
            return this;
        }
    }

View Code

  OrderCommon

  定义全局异常的中间件,还有包含一些用到的实体等等,这部分代码还可优化拆分一下;

  OrderService

  生产者和消费者的具体实现,这块我还想在改造一番,将消费和业务分割开,现在写的很凌乱,不建议这么写,先把代码放出来,看看大家赞同不赞同我的这些用法,可以讨论,也欢迎争论,虽然这块代码写的不好,但是其实里面涉及一些RabbitMQ回调函数的用法,也是比较重要的,没有这些函数也就实现不了我上面说那两个特点;

//RabbitMQ宕机以后回调
//客户端这块大家不要采用递归调用恢复链接
//具体为什么大家可以测试下,这里留点小疑问哈哈
connection.ConnectionShutdown += OnConnectionShutdown;

//消费端异常以后回调
consumerchannel.CallbackException += OnOnConsumerMessageAndWriteMessageLogException;

  Order

  具体的调用者,大家应该根据方法名字就能区分出我上面提到的两种方案的设计;

  HangfireExtensions

  Hangfire定时框架,采用Mysql作为持久层的存储,写的也比较清晰,后期就是针对这些进行扩展,实现在界面就能添加定时任务;

四、结束

  生产端和消费端这段代码写的凌乱,希望大家不要介意这一点,是有原因的,这里我就不说了。希望大家看到闪光点,不要在一点上纠结;下次会加入Elasticsearch和监控部分的时候我会把这块代码改掉,还大家一片整洁的世界;

  Github地址:https://github.com/wangtongzhou520/rabbitmq.git  有什么问题大家可以问我;

  欢迎大家加群438836709!欢迎大家关注我!

  

 

版权声明:本文为wtzbk原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/wtzbk/p/10908601.html