文章目錄
gRPC stream 如何傳送單一大物件
之前筆記 C# 搭配 gRPC 中使用 stream RPC 提到為了對於較大資料量以及即時性資料內容,可以透過 gRPC 的 stream RPC 來處理,不過官方範例是用在傳送 repeated
內容 (如同 List、Array 這類的物件),但現實上難免會遇到需要傳送不只一個 list 的狀況:像是一個大型物件中,包含多個不同長度的 list,剛好專案就有類似需求,我就小小筆記一下,當作備忘
今天內容會以之前筆記 C# 中使用 Protocol Buffers 協定來序列化與反序列化物件 為基礎作延伸
基本環境說明
- macOS Mojave 10.14.5
- .NET Core SDK 2.2.107 (.NET Core Runtime 2.2.5)
NuGet Package
- Google.Protobuf 3.8.0
Bogus 28.0.1
產生假資料用 (非必要)
Grpc 1.22.0
Grpc.Tools 1.22.0
proto
message
syntax = "proto3"; package Messages; //will be placed in a namespace matching the package name if csharp_namespace is not specified option csharp_namespace = "Messages"; import "timestamp.proto"; message Person{ string Id=1; google.protobuf.Timestamp Birthday=2; string Name=3; repeated Job jobs=4; repeated Certificate Certificates=5; repeated School Schools=6; } message Job{ string CompanyName=1; string JobTitle=2; google.protobuf.Timestamp DateFrom=3; google.protobuf.Timestamp DateTo=4; } message Certificate{ string Name=1; string IssueOrg=2; google.protobuf.Timestamp IssueDate=3; } message School{ string SchoolName=1; bool IsGraduated=2; google.protobuf.Timestamp DateFrom=3; google.protobuf.Timestamp DateTo=4; }
service
syntax = "proto3"; package Messages; //will be placed in a namespace matching the package name if csharp_namespace is not specified option csharp_namespace = "Messages"; import "message.proto"; import "empty.proto"; service TestService { rpc GetFakePerson (google.protobuf.Empty) returns (Person); }
使用 Bogus 產生假資料
static Person GeneratePerson() { var certificates = new Faker<Certificate>() .RuleFor(a => a.Name, (f, u) => f.Random.Word()) .RuleFor(a => a.IssueDate, f=>f.Date.Past().ToUniversalTime().ToTimestamp()) .RuleFor(a => a.IssueOrg, (f, u) => f.Company.CompanyName()).Generate(3); var jobs= new Faker<Job>() .RuleFor(a=>a.CompanyName,(f, u) =>f.Company.CompanyName()) .RuleFor(a=>a.JobTitle,(f, u) =>f.Person.Company.Bs) .RuleFor(a=>a.DateFrom,(f, u) => f.Date.Past(3).ToUniversalTime().ToTimestamp()) .RuleFor(a=>a.DateTo,(f, u) =>f.Date.Past(1).ToUniversalTime().ToTimestamp()) .Generate(4); var schools = new Faker<School>() .RuleFor(a=>a.SchoolName,(f, u) =>f.Company.CompanyName()) .RuleFor(a=>a.DateFrom,(f, u) =>f.Date.Past(3).ToUniversalTime().ToTimestamp()) .RuleFor(a=>a.DateTo,(f, u) => f.Date.Past(1).ToUniversalTime().ToTimestamp()) .RuleFor(a=>a.IsGraduated,(f,u)=>f.Random.Bool()).Generate(5) ; var person = new Faker<Person>() .RuleFor(a=>a.Name,(f,u)=>f.Person.FullName) .RuleFor(a=>a.Id,(f,u)=>Guid.NewGuid().ToString()) .RuleFor(a=>a.Birthday,(f,u)=>f.Person.DateOfBirth.ToUniversalTime().ToTimestamp()) .Generate() ; person.Certificates.AddRange(certificates); person.Jobs.AddRange(jobs); person.Schools.AddRange(schools); return person; }
修改方式
因為無法將一個大物件中做合理分群 (像是 list 切分為多個小 lsit) 進行 stream 傳送,所以作法改為 傳送 byte[]
這樣一樣就可以有效對於 stream rpc 要傳送的內容進行有群切分了
加入 field 為
bytes
的 messagemessage Chunk { bytes chunk = 1; }
原本回傳大物件的 service 改為
stream Chunk
rpc GetFakePerson (google.protobuf.Empty) returns (stream Chunk);
修改傳送端
protobuf 序列化的用法可以參考 C# 中使用 Protocol Buffers 協定來序列化與反序列化物件
- 改用
stream
透過 Google.Protobuf 轉為 byte[] 傳送
var result = GeneratePerson(); using (var ms = new MemoryStream()) { result.WriteTo(ms); //每次以 64k 傳送 const int chunkSize = 64 * 1024; var streamResult = ms.ToArray(); var chunkCount = streamResult.Length % chunkSize == 0 ? streamResult.Length / chunkSize : (streamResult.Length / chunkSize) + 1; for (var i = 0; i < chunkCount; i++) { await responseStream.WriteAsync( new Chunk(){Chunk_ = ByteString.CopyFrom(streamResult.Skip(chunkSize*i).Take(chunkSize).ToArray())} ); } }
- 改用
修改接受端
protobuf 反序列化的用法可以參考 C# 中使用 Protocol Buffers 協定來序列化與反序列化物件
- 改用 stream 接收
透過 Google.Protobuf 讀取 byte[] 轉型
var streamResult = new List<byte>(); using (var client = serviceClient.GetFakePerson(new Empty())) { // 逐一取出 stream 內容 while (await client.ResponseStream.MoveNext()) { streamResult.AddRange(client.ResponseStream.Current.Chunk_.ToByteArray()); } } var person = Person.Parser.ParseFrom(streamResult.ToArray()); return person;
心得
雖然依上述步驟可以解決 使用 stream rpc 傳送單一大型物件
的問題,但做法上我自己覺得還有改進空間
- stream RPC 需要自行決定傳送型態與切割傳送大小
client 端需先暫存 byte[],官方文件提到 Google.Protobuf 可以直接讀取 ByteString
這個可能是我自己沒找到方法
參考資訊
文章作者 Yowko Tsai
上次更新 2020-12-11
授權合約
本部落格 (Yowko's Notes) 所有的文章內容(包含圖片),任何轉載行為,必須通知並獲本部落格作者 (Yowko Tsai) 的同意始得轉載,且轉載皆須註明出處與作者。
Yowko's Notes 由 Yowko Tsai 製作,以創用CC 姓名標示-非商業性-相同方式分享 3.0 台灣 授權條款 釋出。