文章目錄
gRPC stream 如何傳送單一大物件 (Client 版)
繼之前筆記 gRPC stream 如何傳送單一大物件 提到該如何使用 gRPC stream 來傳送不是整齊 collection 物件後,公司專案已逐步將可能傳送超出預設 4mb 限制的 api 改用 stream 以避免可能出現的 RESOURCE_EXHAUSTED
錯誤,截至目前使用上沒有遇到異常狀況。
上班前同事問到,該如何透過 client 端來上傳單一大物件,我翻了翻筆記,發現沒得抄,想要直接開工卻想不起來該怎麼做 (看來真的不能不服老呀),所以立馬來筆記一下,避免以後沒地方抄又想很久 QQ
今天內容會以之前筆記 gRPC stream 如何傳送單一大物件 為基礎作延伸
基本環境說明
- macOS Mojave 10.14.6
- .NET Core SDK 2.2.301 (.NET Core Runtime 2.2.6)
NuGet Package
- Google.Protobuf 3.8.0
Bogus 28.0.1
產生假資料用 (非必要)
Grpc 1.22.0
Grpc.Tools 1.22.0
proto
message
syntax = "proto3"; package Messages; //will be placed in a namespace matching the package name if csharp_namespace is not specified option csharp_namespace = "Messages"; import "timestamp.proto"; message Person{ string Id=1; google.protobuf.Timestamp Birthday=2; string Name=3; repeated Job jobs=4; repeated Certificate Certificates=5; repeated School Schools=6; } message Job{ string CompanyName=1; string JobTitle=2; google.protobuf.Timestamp DateFrom=3; google.protobuf.Timestamp DateTo=4; } message Certificate{ string Name=1; string IssueOrg=2; google.protobuf.Timestamp IssueDate=3; } message School{ string SchoolName=1; bool IsGraduated=2; google.protobuf.Timestamp DateFrom=3; google.protobuf.Timestamp DateTo=4; } message Chunk { bytes chunk = 1; }
service
syntax = "proto3"; package Messages; //will be placed in a namespace matching the package name if csharp_namespace is not specified option csharp_namespace = "Messages"; import "message.proto"; import "empty.proto"; service TestService { rpc GetFakePerson (google.protobuf.Empty) returns (stream Chunk); }
使用 Bogus 產生假資料
static Person GeneratePerson() { var certificates = new Faker<Certificate>() .RuleFor(a => a.Name, (f, u) => f.Random.Word()) .RuleFor(a => a.IssueDate, f=>f.Date.Past().ToUniversalTime().ToTimestamp()) .RuleFor(a => a.IssueOrg, (f, u) => f.Company.CompanyName()).Generate(3); var jobs= new Faker<Job>() .RuleFor(a=>a.CompanyName,(f, u) =>f.Company.CompanyName()) .RuleFor(a=>a.JobTitle,(f, u) =>f.Person.Company.Bs) .RuleFor(a=>a.DateFrom,(f, u) => f.Date.Past(3).ToUniversalTime().ToTimestamp()) .RuleFor(a=>a.DateTo,(f, u) =>f.Date.Past(1).ToUniversalTime().ToTimestamp()) .Generate(4); var schools = new Faker<School>() .RuleFor(a=>a.SchoolName,(f, u) =>f.Company.CompanyName()) .RuleFor(a=>a.DateFrom,(f, u) =>f.Date.Past(3).ToUniversalTime().ToTimestamp()) .RuleFor(a=>a.DateTo,(f, u) => f.Date.Past(1).ToUniversalTime().ToTimestamp()) .RuleFor(a=>a.IsGraduated,(f,u)=>f.Random.Bool()).Generate(5) ; var person = new Faker<Person>() .RuleFor(a=>a.Name,(f,u)=>f.Person.FullName) .RuleFor(a=>a.Id,(f,u)=>Guid.NewGuid().ToString()) .RuleFor(a=>a.Birthday,(f,u)=>f.Person.DateOfBirth.ToUniversalTime().ToTimestamp()) .Generate() ; person.Certificates.AddRange(certificates); person.Jobs.AddRange(jobs); person.Schools.AddRange(schools); return person; }
修改方式
service.proto 加入允許使用 stream 上傳的方式
rpc CreatePerson (stream Chunk) returns (Person);
client 上傳大物件
protobuf 序列化的用法可以參考 C# 中使用 Protocol Buffers 協定來序列化與反序列化物件
- 使用
stream
透過 Google.Protobuf 轉為 byte[] 傳送
using (var creator = serviceClient.CreatePerson()) { var createPerson = GeneratePerson(); using (var ms = new MemoryStream()) { createPerson.WriteTo(ms); //每次以 64k 傳送 const int chunkSize = 64 * 1024; var streamResult = ms.ToArray(); var chunkCount = streamResult.Length % chunkSize == 0 ? streamResult.Length / chunkSize : (streamResult.Length / chunkSize) + 1; for (var i = 0; i < chunkCount; i++) { await creator.RequestStream.WriteAsync( new Chunk(){Chunk_ = ByteString.CopyFrom(streamResult.Skip(chunkSize*i).Take(chunkSize).ToArray())} ); } } //等待完成上傳 await creator.RequestStream.CompleteAsync(); //回傳 gRPC call 結果 return await creator; }
- 使用
server 端接收大物件
protobuf 反序列化的用法可以參考 C# 中使用 Protocol Buffers 協定來序列化與反序列化物件
- 使用 stream 接收
透過 Google.Protobuf 讀取 byte[] 轉型
var streamResult = new List<byte>(); //逐步接收內容 while (await requestStream.MoveNext()) { var current = requestStream.Current; streamResult.AddRange(current.Chunk_); } //將收到的 byte array 轉為 Person var person = Person.Parser.ParseFrom(streamResult.ToArray()); //回傳收到的內容 return person;
心得
仔細看上面所使用的例子可以發現有個 bug,我們使用 gRPC stream 就是想要處理超乎 4mb 單次傳送限制的內容, client 上傳改用 srteam 這部份程式沒有問題,但回傳時卻沒有用上 stream,如果上傳的物件真的超過 4mb 時就會出現錯誤,不過我考慮再三還是決定先不加,因為這次筆記的重點是紀錄 client 上傳使用 stream chunk 的做法,如果加上回傳也用上 stream 就變成雙向 stream,跟這次目的不符也會使得程式碼變複雜,如果有需要雙向 stream 之後再另外筆記囉
參考資訊
文章作者 Yowko Tsai
上次更新 2020-12-11
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。