文章目錄
C# 搭配 gRPC 中使用 stream RPC
gRPC 允許使用四種則型的 service 方法:
- 簡單 RPC (simple RPC)
- 主機端串流 RPC (server-side streaming RPC)
- 用戶端串流 RPC (client-side streaming RPC)
- 雙向串流 RPC (bidirectional streaming RPC)
過去的筆記都是使用 簡單 RPC (simple RPC)
,最近剛好工作上需要用到 streaming RPC,紀錄一下使用方式備忘
基本環境說明
- macOS Mojave 10.14.5
- .NET Core SDK 2.2.107 (.NET Core Runtime 2.2.5)
NuGet package
- Grpc 1.21.0
- Grpc.Tools 1.21.0
- Google.Protobuf 3.8.0
- Bogus 27.0.1
protobuf 定義
message
syntax = "proto3"; package Message; //will be placed in a namespace matching the package name if csharp_namespace is not specified option csharp_namespace = "GRpc.Messages"; message Candidates { repeated Candidate Candidates = 2; } message Candidate { string Name = 1; repeated Job Jobs = 2; } message Job { string Title = 1; int32 Salary = 2; string JobDescription = 3; } message DownloadByName { string Name = 1; } message CreateCvResponse { bool IsSuccess = 1; }
service
syntax = "proto3"; package Message; //will be placed in a namespace matching the package name if csharp_namespace is not specified option csharp_namespace = "GRpc.Messages"; import "message.proto"; service CandidateService { rpc CreateCv (Candidate) returns (CreateCvResponse); rpc DownloadCv (DownloadByName) returns (Candidate); rpc CreateDownloadCv (Candidate) returns (Candidates); }
實際使用 stream
今天就三種 streaming RPC 來紀錄 (simple RPC 之前筆記就一直使用中,不特別紀錄了)
主機端串流 RPC (server-side streaming RPC)
適合從 server 端取得較大資料量時使用
在 service 的 rpc 定義中將 return 的 message 加上
stream
修飾子rpc DownloadCv (DownloadByName) returns (stream Candidate);
Server
實作 stream 傳送public override async Task DownloadCv(DownloadByName request, IServerStreamWriter<Candidate> responseStream, ServerCallContext context) { var fakeJobs = new Faker<Job>() .RuleFor(a => a.Title, (f, u) => f.Company.Bs()) .RuleFor(a => a.Salary, (f, u) => f.Commerce.Random.Int(1000, 2000)) .RuleFor(a => a.JobDescription, (f, u) => f.Lorem.Text()); var createRequests = new Faker<Candidate>() .RuleFor(a => a.Name, (f, u) => f.Name.FullName()) .RuleFor(a => a.Jobs, (f, u) => { u.Jobs.AddRange(fakeJobs.GenerateBetween(3, 5)); return u.Jobs; }).Generate(); // 將每筆資料逐一透過 WriteAsync 輸出 await responseStream.WriteAsync(createRequests); }
Client
實作 stream 接收using (var client = candidateServiceClient.DownloadCv(new DownloadByName() { Name = "test" })) { // 逐一取出 stream 內容 while (await client.ResponseStream.MoveNext()) { result = client.ResponseStream.Current; } }
用戶端串流 RPC (client-side streaming RPC)
適合傳送較大資料量至 server 端時使用
在 service 的 rpc 定義中將 rpc 接受的 message 加上
stream
修飾子rpc CreateCv (stream Candidate) returns (CreateCvResponse);
Server
實作 stream 讀取public override async Task<CreateCvResponse> CreateCv(IAsyncStreamReader<Candidate> requestStream, ServerCallContext context) { var result = new CreateCvResponse { IsSuccess = false }; // stream 讀取 while (await requestStream.MoveNext()) { var candidate = requestStream.Current; // 實際處理 Console.WriteLine(candidate.Name); } return result; }
Client
實作 stream 傳送using (var creator = candidateServiceClient.CreateCv()) { // 將資料逐一輸出至 server foreach (var createRequest in createRequests) { await creator.RequestStream.WriteAsync(createRequest); } await creator.RequestStream.CompleteAsync(); var summary = await creator.ResponseAsync; }
雙向串流 RPC (bidirectional streaming RPC)
適合在 client 與 server 端間雙向傳送大資料量或是即時資料時使用
在 service 的 rpc 定義中將 rpc 傳送與接受的 message 都加上
stream
修飾子rpc CreateDownloadCv (stream Candidate) returns (stream Candidates);
Server
實作 streampublic override async Task CreateDownloadCv(IAsyncStreamReader<Candidate> requestStream, IServerStreamWriter<Candidates> responseStream, ServerCallContext context) { var candidates = new Candidates(); // 將收到的資料逐一取出 while (await requestStream.MoveNext()) { var candidate = requestStream.Current; candidates.Candidates_.Add(candidate); // 將處理後的資料回傳 await responseStream.WriteAsync(candidates); } }
Client
實作 streamusing (var call = candidateServiceClient.CreateDownloadCv()) { var responseReaderTask = Task.Run(async () => { // 逐一取出 response 內容 while (await call.ResponseStream.MoveNext()) { var candidate = call.ResponseStream.Current; result.AddRange(candidate.Candidates_); } }); // 將資料逐一傳送至 server foreach (var request in createRequests) { await call.RequestStream.WriteAsync(request); } await call.RequestStream.CompleteAsync(); await responseReaderTask; }
心得
資料量大時透過 stream 可以使用即時做出回應而不需等待所有資料傳送完成,程式碼並不多也滿好理解的,先紀錄一下使用方式,後續再紀錄實際使用情境及其他相關用法
完整程式碼請參考 yowko/dotnetgrpcstream
參考資訊
文章作者 Yowko Tsai
上次更新 2021-11-03
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。