gRPC Client Middleware.
中间件想必大家不陌生,今天给大家介绍如何实现中间件以及实现gRPC的客户端中间件。
什么是中间件?
https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/middleware/index?view=aspnetcore-2.1&tabs=aspnetcore2x
中间件管道
先定义管道Pipeline
/// <summary> /// Built pipeline for gRPC /// </summary> public class Pipeline { private PipelineDelagate processChain; internal Pipeline(PipelineDelagate middlewareChain) { processChain = middlewareChain; } internal Task RunPipeline(MiddlewareContext context) { return processChain(context); } }
然后实现PipelineBuilder
/// <summary> /// PipelineBuilder /// </summary> public class PipelineBuilder { private List<Func<PipelineDelagate, PipelineDelagate>> middlewares = new List<Func<PipelineDelagate, PipelineDelagate>>(); /// <summary> /// Add a middleware /// </summary> public PipelineBuilder Use(Func<PipelineDelagate, PipelineDelagate> middleware) { middlewares.Add(middleware); return this; } /// <summary> /// Use /// </summary> public PipelineBuilder Use<T>(params object[] args) { middlewares.Add(d => WrapClass<T>(d, args)); return this; } /// <summary> /// UseWhen /// </summary> public PipelineBuilder UseWhen<T>(Func<MiddlewareContext, bool> condition, params object[] args) { middlewares.Add(d => { return async ctx => { if (condition(ctx)) { await WrapClass<T>(d, args)(ctx); } }; }); return this; } /// <summary> /// Use /// </summary> public PipelineBuilder Use(Func<MiddlewareContext, PipelineDelagate, Task> middleware) { middlewares.Add(d => { return ctx => { return middleware(ctx, d); }; }); return this; } private PipelineDelagate WrapClass<T>(PipelineDelagate next, params object[] args) { var ctorArgs = new object[args.Length + 1]; ctorArgs[0] = next; Array.Copy(args, 0, ctorArgs, 1, args.Length); var type = typeof(T); var instance = Activator.CreateInstance(type, ctorArgs); MethodInfo method = type.GetMethod("Invoke"); return (PipelineDelagate)method.CreateDelegate(typeof(PipelineDelagate), instance); } /// <summary> /// Build /// </summary> public Pipeline Build() { PipelineDelagate pipeline = ExecuteMainHandler; middlewares.Reverse(); foreach (var middleware in middlewares) { pipeline = middleware(pipeline); } return new Pipeline(pipeline); } internal static Task ExecuteMainHandler(MiddlewareContext context) { return context.HandlerExecutor(); } }
MiddlewareContext 和 委托定义
/// <summary> /// MiddlewareContext /// </summary> public class MiddlewareContext { public IMethod Method { get; set; } public object Request { get; set; } public object Response { get; set; } public string Host { get; set; } public CallOptions Options { get; set; } internal Func<Task> HandlerExecutor { get; set; } } public delegate Task PipelineDelagate(MiddlewareContext context);
到这里管道建设完成,那个如何在gRPC中使用呢?
首先实现自己的客户端拦截器MiddlewareCallInvoker
public sealed class MiddlewareCallInvoker : DefaultCallInvoker { private readonly Channel grpcChannel; private Pipeline MiddlewarePipeline { get; set; } public MiddlewareCallInvoker(Channel channel) : base(channel) { this.grpcChannel = channel; } public MiddlewareCallInvoker(Channel channel, Pipeline pipeline) : this(channel) { this.MiddlewarePipeline = pipeline; } private TResponse Call<TResponse>(Func<MiddlewareContext, TResponse> call, MiddlewareContext context) { TResponse response = default(TResponse); if (MiddlewarePipeline != null) { context.HandlerExecutor = async () => { response = await Task.FromResult(call(context)); context.Response = response; }; MiddlewarePipeline.RunPipeline(context).ConfigureAwait(false); } else { response = call(context); } return response; } public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) { return Call((context) => base.BlockingUnaryCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext { Host = host, Method = method, Options = options, Request = request, Response = null }); } public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>( Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) { return Call((context) => base.AsyncUnaryCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext { Host = host, Method = method, Options = options, Request = request, Response = null }); } public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) { return Call((context) => base.AsyncServerStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext { Host = host, Method = method, Options = options, Request = request, Response = null }); } public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) { return Call((context) => base.AsyncClientStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options), new MiddlewareContext { Host = host, Method = method, Options = options, Request = null, Response = null }); } public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) { return Call((context) => base.AsyncDuplexStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options), new MiddlewareContext { Host = host, Method = method, Options = options, Request = null, Response = null }); } }
到这里基于管道的中间件基本完成。接下来就是在客户端使用了
var pipeline = new PipelineBuilder() //.Use<ExceptionMiddleware>() //.Use<TimerMiddleware>() //.Use<LoggingMiddleware>() //.Use<TimeoutMiddleware>() .Use<PolicyMiddleware>(new PolicyMiddlewareOptions { RetryTimes = 2, TimoutMilliseconds = 100 }) ; //pipeline.Use<LoggingMiddleware>();// pipeline.UseWhen<LoggingMiddleware>(ctx => ctx.Context.Method.EndsWith("SayHello")); //pipeline.Use<TimeoutMiddleware>(new TimeoutMiddlewareOptions { TimoutMilliseconds = 1000 }); //console logger pipeline.Use(async (ctx, next) => { Console.WriteLine(ctx.Request.ToString()); await next(ctx); Console.WriteLine(ctx.Response.ToString()); }); Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure); MiddlewareCallInvoker callInvoker = new MiddlewareCallInvoker(channel, pipeline.Build()); var clientApi = new Get.GetClient(callInvoker); var replyApi = clientApi.SayHello(new ContractsSample1.HelloApiDemo.HelloRequest { Name = " APII" }); Console.WriteLine("Greeting: " + replyApi.Message);
Middleware
public class LoggingMiddleware { private PipelineDelagate _next; public LoggingMiddleware(PipelineDelagate next) { _next = next; } public async Task Invoke(MiddlewareContext context) { //Before await _next(context); //End } }
至此已完成所有工作。希望能够帮助到大家。
如果有机会将给大家介绍gRPC跨平台网关,该网关能够实现自动将下游的gRPC服务转化成http api,只需要你的proto文件即可。如果你使用consul,甚至无需你做任务配置。