01
—
迁移数据契约和服务契约
在本节中,我们将使用一个简单的请求响应的组合服务,它可以让你下载给定交易者的单个投资组合或所有投资组合。服务和数据契约的定义如下:
[ServiceContract]public interface IPortfolioService{[OperationContract]Task<Portfolio> Get(Guid traderId, int portfolioId);[OperationContract]Task<List<Portfolio>> GetAll(Guid traderId);}[DataContract]public class Portfolio{[DataMember]public int Id { get; set; }[DataMember]public Guid TraderId { get; set; }[DataMember]public List<PortfolioItem> Items { get; set; }}[DataContract]public class PortfolioItem{[DataMember]public int Id { get; set; }[DataMember]public int ShareId { get; set; }[DataMember]public int Holding { get; set; }[DataMember]public decimal Cost { get; set; }}
在将数据契约和服务契约迁移到gRPC之前,我建议为契约创建一个新的类库。这些契约可以通过项目引用或包引用在服务器和客户端之间很容易地共享,这取决于你的WCF解决方案的结构。一旦我们创建了类库,我们将源文件进行复制并开始迁移到gRPC。
不像使用Google.Protobuf迁移到gRPC那样,数据契约只需要最小的更改。我们需要做的唯一一件事是在DataMember属性中定义Order属性。这相当于在创建.proto格式的消息时定义字段号。这些字段号用于标识消息二进制格式中的字段,并且在使用消息后不应更改。
[DataContract]public class Portfolio{[DataMember(Order = 1)]public int Id { get; set; }[DataMember(Order = 2)]public Guid TraderId { get; set; }[DataMember(Order = 3)]public List<PortfolioItem> Items { get; set; }}[DataContract]public class PortfolioItem{[DataMember(Order = 1)]public int Id { get; set; }[DataMember(Order = 2)]public int ShareId { get; set; }[DataMember(Order = 3)]public int Holding { get; set; }[DataMember(Order = 4)]public decimal Cost { get; set; }}
[ServiceContract]public interface IPortfolioService{[OperationContract]Task<Portfolio> Get(GetPortfolioRequest request);[OperationContract]Task<PortfolioCollection> GetAll(GetAllPortfoliosRequest request);}
[DataContract]public class GetPortfolioRequest{[DataMember(Order = 1)]public Guid TraderId { get; set; }[DataMember(Order = 2)]public int PortfolioId { get; set; }}[DataContract]public class GetAllPortfoliosRequest{[DataMember(Order = 1)]public Guid TraderId { get; set; }}[DataContract]public class PortfolioCollection{[DataMember(Order = 1)]public List<Portfolio> Items { get; set; }}
02
—
将PortfolioData库迁移到.net Core
接下来,我们将把PortfolioData库迁移到.net Core,就像微软指南中描述的那样。但是,我们不需要复制模型(Portfolio.cs和PortfolioItem.cs),因为它们已经在我们在上一节中创建的类库中定义了。相反,我们将向该共享库添加一个项目引用。下一步是将WCF服务迁移到ASP.Net Core应用程序。
03
—
将WCF服务迁移到ASP.Net Core应用程序
我们需要做的第一件事是创建一个ASP.Net Core应用程序。因此,要么启动你最喜欢的IDE,创建一个基本的ASP.NET Core application或从命令行运行dotnet new web。接下来,我们需要添加一个对protobuf-net.Grpc的包。使用你最喜欢的包管理器安装它,或者简单地运行dotnet add package protobuf-net.Grpc.AspNetCore。我们还需要向上一节中创建的PortfolioData库添加一个项目引用。
现在我们已经准备好了项目,并且添加了所有的依赖项,我们可以继续并创建portfolio服务。创建一个具有以下内容的新类。
public class PortfolioService : IPortfolioService{private readonly IPortfolioRepository _repository;public PortfolioService(IPortfolioRepository repository){_repository = repository;}public async Task<Portfolio> Get(GetPortfolioRequest request){var portfolio = await _repository.GetAsync(request.TraderId, request.PortfolioId);return portfolio;}public async Task<PortfolioCollection> GetAll(GetAllPortfoliosRequest request){var portfolios = await _repository.GetAllAsync(request.TraderId);var response = new PortfolioCollection{Items = portfolios};return response;}}
上面的服务看起来与WCF服务实现非常相似,除了输入参数类型和返回参数类型之外。
最后但并非最不重要的,我们需要将protobuf-net.Grpc接入ASP.Net Core管道,并在DI容器中注册。在启Startup.cs,我们将做以下补充:
public class Startup{public void ConfigureServices(IServiceCollection services){services.AddScoped<IPortfolioRepository, PortfolioRepository>();services.AddCodeFirstGrpc();}public void Configure(IApplicationBuilder app, IWebHostEnvironment env){if (env.IsDevelopment()){app.UseDeveloperExceptionPage();}app.UseRouting();app.UseEndpoints(endpoints =>{endpoints.MapGrpcService<PortfolioService>();});}}
现在我们已经有了gRPC服务。我们列表上的下一件事是创建客户端应用程序。
04
—
创建gRPC客户端应用程序
对于我们的客户端应用程序,我们将继续创建一个控制台应用程序。要么使用你最喜欢的IDE创建一个控制台,要么直接从命令行运行dotnet new console。接下来,我们需要添加对protobuf-net.Grpc和Grpc.Net.Client的NuGet包。使用你最喜欢的包管理器安装它们,或者简单地运行dotnet add package protobuf-net.Grpc和dotnet add package Grpc.Net.Client。我们还需要向我们在第一节中创建的共享库添加一个项目引用。
在我们的Program.cs中,我们将添加以下代码来创建gRPC客户端并与gRPC服务通信。
class Program{private const string ServerAddress = "https://localhost:5001";static async Task Main(){var channel = GrpcChannel.ForAddress(ServerAddress);var portfolios = channel.CreateGrpcService<IPortfolioService>();try{var request = new GetPortfolioRequest{TraderId = Guid.Parse("68CB16F7-42BD-4330-A191-FA5904D2E5A0"),PortfolioId = 42};var response = await portfolios.Get(request);Console.WriteLine($"Portfolio contains {response.Items.Count} items.");}catch (RpcException e){Console.WriteLine(e.ToString());}}}
现在我们可以测试我们的实现,首先启动ASP.NET Core应用程序,然后启动控制台应用程序。
05
—
将WCF双工服务迁移到gRPC
现在我们已经介绍了使用protobuf-net.Grpc 将WCF服务迁移到gRPC的基本知识,我们可以看看一些更复杂的例子。
在本节中,我们将查看SimpleStockPriceTicker,这是一个双工服务,客户端启动连接,服务器使用回调接口在更新可用时发送更新。WCF服务有一个没有返回类型的方法,因为它使用回调接口ISimpleStockTickerCallback实时向客户端发送数据。
[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(ISimpleStockTickerCallback))]public interface ISimpleStockTickerService{[OperationContract(IsOneWay = true)]void Subscribe(string[] symbols);}[ServiceContract]public interface ISimpleStockTickerCallback{[OperationContract(IsOneWay = true)]void Update(string symbol, decimal price);}
[ServiceContract]public interface IStockTickerService{[OperationContract]IAsyncEnumerable<StockTickerUpdate> Subscribe(SubscribeRequest request, CallContext context = default);}
请注意CallContext参数,它是客户端和服务器端的gRPC调用上下文。这允许我们在客户端和服务器端访问调用上下文,而不需要单独的接口。Gogogle.Protobuf生成的代码将在客户端使用调用,而在服务器端使用ServerCallContext。
因为WCF服务只使用基本类型作为参数,所以我们需要创建一组可以用作参数的数据契约。上面的服务附带的数据契约看起来像这样。注意,我们已经向响应消息添加了一个时间戳字段,这个字段在原始WCF服务中不存在。
[DataContract]public class SubscribeRequest{[DataMember(Order = 1)]public List<string> Symbols { get; set; } = new List<string>();}[DataContract]public class StockTickerUpdate{[DataMember(Order = 1)]public string Symbol { get; set; }[DataMember(Order = 2)]public decimal Price { get; set; }[DataMember(Order = 3)]public DateTime Time { get; set; }}
public class StockTickerService : IStockTickerService, IDisposable{private readonly IStockPriceSubscriberFactory _subscriberFactory;private readonly ILogger<StockTickerService> _logger;private IStockPriceSubscriber _subscriber;public StockTickerService(IStockPriceSubscriberFactory subscriberFactory, ILogger<StockTickerService> logger){_subscriberFactory = subscriberFactory;_logger = logger;}public IAsyncEnumerable<StockTickerUpdate> Subscribe(SubscribeRequest request, CallContext context = default){var buffer = Channel.CreateUnbounded<StockTickerUpdate>();_subscriber = _subscriberFactory.GetSubscriber(request.Symbols.ToArray());_subscriber.Update += async (sender, args) =>{try{await buffer.Writer.WriteAsync(new StockTickerUpdate{Symbol = args.Symbol,Price = args.Price,Time = DateTime.UtcNow});}catch (Exception e){_logger.LogError($"Failed to write message: {e.Message}");}};return buffer.AsAsyncEnumerable(context.CancellationToken);}public void Dispose(){_subscriber?.Dispose();}}
[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IFullStockTickerCallback))]public interface IFullStockTickerService{[OperationContract(IsOneWay = true)]void Subscribe();[OperationContract(IsOneWay = true)]void AddSymbol(string symbol);[OperationContract(IsOneWay = true)]void RemoveSymbol(string symbol);}[ServiceContract]public interface IFullStockTickerCallback{[OperationContract(IsOneWay = true)]void Update(string symbol, decimal price);}
[ServiceContract]public interface IFullStockTicker{[OperationContract]IAsyncEnumerable<StockTickerUpdate> Subscribe(IAsyncEnumerable<SymbolRequest> request, CallContext context = default);}
public enum SymbolRequestAction{Add = 0,Remove = 1}[DataContract]public class SymbolRequest{[DataMember(Order = 1)]public SymbolRequestAction Action { get; set; }[DataMember(Order = 2)]public string Symbol { get; set; }}[DataContract]public class StockTickerUpdate{[DataMember(Order = 1)]public string Symbol { get; set; }[DataMember(Order = 2)]public decimal Price { get; set; }[DataMember(Order = 3)]public DateTime Time { get; set; }}
public class FullStockTickerService : IFullStockTicker, IDisposable{private readonly IFullStockPriceSubscriberFactory _subscriberFactory;private readonly ILogger<FullStockTickerService> _logger;private IFullStockPriceSubscriber _subscriber;private Task _processRequestTask;private CancellationTokenSource _cts;public FullStockTickerService(IFullStockPriceSubscriberFactory subscriberFactory, ILogger<FullStockTickerService> logger){_subscriberFactory = subscriberFactory;_logger = logger;_cts = new CancellationTokenSource();}public IAsyncEnumerable<StockTickerUpdate> Subscribe(IAsyncEnumerable<SymbolRequest> request, CallContext context){var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, context.CancellationToken).Token;var buffer = Channel.CreateUnbounded<StockTickerUpdate>();_subscriber = _subscriberFactory.GetSubscriber();_subscriber.Update += async (sender, args) =>{try{await buffer.Writer.WriteAsync(new StockTickerUpdate{Symbol = args.Symbol,Price = args.Price,Time = DateTime.UtcNow});}catch (Exception e){_logger.LogError($"Failed to write message: {e.Message}");}};_processRequestTask = ProcessRequests(request, buffer.Writer, cancellationToken);return buffer.AsAsyncEnumerable(cancellationToken);}private async Task ProcessRequests(IAsyncEnumerable<SymbolRequest> requests, ChannelWriter<StockTickerUpdate> writer, CancellationToken cancellationToken){await foreach (var request in requests.WithCancellation(cancellationToken)){switch (request.Action){case SymbolRequestAction.Add:_subscriber.Add(request.Symbol);break;case SymbolRequestAction.Remove:_subscriber.Remove(request.Symbol);break;default:_logger.LogWarning($"Unknown Action '{request.Action}'.");break;}}writer.Complete();}public void Dispose(){_cts.Cancel();_subscriber?.Dispose();}}
06
—
总结
恭喜你!你已经走到这一步了。现在你知道了将WCF服务迁移到gRPC的另一种方法。希望这种技术比用.proto格式重写现有的数据契约要快得多。
欢迎关注我的公众号,如果你有喜欢的外文技术文章,可以通过公众号留言推荐给我。




