DemoerService.cs 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. using Google.Protobuf;
  2. using Grpc.Core;
  3. using GrpcServer;
  4. using Microsoft.Extensions.Logging;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Threading.Tasks;
  10. namespace GrpcServiceServer
  11. {
  12. public class DemoerService :Demoer.DemoerBase
  13. {
  14. private readonly ILogger<DemoerService> _logger;
  15. public DemoerService(ILogger<DemoerService> logger)
  16. {
  17. _logger = logger;
  18. }
  19. /// <summary>
  20. /// 一元调用
  21. /// </summary>
  22. /// <param name="request"></param>
  23. /// <param name="context"></param>
  24. /// <returns></returns>
  25. public override Task<DataMessage> SayHello(DataMessage request, ServerCallContext context)
  26. {
  27. Console.WriteLine(request.Message);
  28. return Task.FromResult(new DataMessage
  29. {
  30. Message = "Hello " + request.Message
  31. });
  32. }
  33. private static HashSet<IServerStreamWriter<ChatMessage>> _subscribers = new HashSet<IServerStreamWriter<ChatMessage>>();
  34. /// <summary>
  35. /// 双向流式处理调用
  36. /// </summary>
  37. /// <param name="requestStream"></param>
  38. /// <param name="responseStream"></param>
  39. /// <param name="context"></param>
  40. /// <returns></returns>
  41. public override Task Chat(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream, ServerCallContext context)
  42. {
  43. return ChatCore(requestStream, responseStream);
  44. }
  45. public static async Task ChatCore(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream)
  46. {
  47. if (!await requestStream.MoveNext())
  48. {
  49. // No messages so don't register and just exit.
  50. return;
  51. }
  52. // Warning, the following is very racy
  53. _subscribers.Add(responseStream);
  54. do
  55. {
  56. await BroadcastMessageAsync(requestStream.Current);
  57. } while (await requestStream.MoveNext());
  58. _subscribers.Remove(responseStream);
  59. }
  60. private static async Task BroadcastMessageAsync(ChatMessage message)
  61. {
  62. message.Name= message.Name + " Say:";
  63. foreach (var subscriber in _subscribers)
  64. {
  65. await subscriber.WriteAsync(message);
  66. }
  67. }
  68. /// <summary>
  69. /// 客户端流式处理调用
  70. /// </summary>
  71. /// <param name="requestStream"></param>
  72. /// <param name="context"></param>
  73. /// <returns></returns>
  74. public override async Task<DataComplete> ClientStreamedData(
  75. IAsyncStreamReader<DataMessage> requestStream,
  76. ServerCallContext context)
  77. {
  78. var total = 0L;
  79. await foreach (var message in requestStream.ReadAllAsync())
  80. {
  81. total += message.Data.Length;
  82. Console.WriteLine(message.Message+" "+ message.Data.ToStringUtf8()+" "+message.ServerDelayMilliseconds);
  83. if (message.ServerDelayMilliseconds > 0)
  84. {
  85. await Task.Delay(message.ServerDelayMilliseconds);
  86. }
  87. }
  88. return new DataComplete
  89. {
  90. Size = total
  91. };
  92. }
  93. public override async Task BufferAllData(
  94. IAsyncStreamReader<DataMessage> requestStream,
  95. IServerStreamWriter<DataMessage> responseStream,
  96. ServerCallContext context)
  97. {
  98. // Read data into MemoryStream
  99. var ms = new MemoryStream();
  100. await foreach (var message in requestStream.ReadAllAsync())
  101. {
  102. ms.Write(message.Data.Span);
  103. _logger.LogInformation($"Received {ms.Length} bytes");
  104. }
  105. // Write back to client in batches
  106. var data = ms.ToArray();
  107. var sent = 0;
  108. while (sent < data.Length)
  109. {
  110. const int BatchSize = 1024 * 64; // 64 KB
  111. var writeCount = Math.Min(data.Length - sent, BatchSize);
  112. await responseStream.WriteAsync(new DataMessage
  113. {
  114. Data = ByteString.CopyFrom(data, sent, writeCount)
  115. });
  116. sent += writeCount;
  117. _logger.LogInformation($"Sent {sent} bytes");
  118. }
  119. }
  120. /// <summary>
  121. /// 服务器流式处理调用
  122. /// </summary>
  123. /// <param name="request"></param>
  124. /// <param name="responseStream"></param>
  125. /// <param name="context"></param>
  126. /// <returns></returns>
  127. public override async Task SayHellos(DataMessage request, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
  128. {
  129. // Explicitly send the response headers before any streamed content
  130. Metadata responseHeaders = new Metadata
  131. {
  132. { "test-response-header", "value" }
  133. };
  134. await context.WriteResponseHeadersAsync(responseHeaders);
  135. await SayHellosCore(request, responseStream);
  136. }
  137. public static async Task SayHellosCore(DataMessage request, IServerStreamWriter<DataMessage> responseStream)
  138. {
  139. for (var i = 0; i < 3; i++)
  140. {
  141. // Gotta look busy
  142. // await Task.Delay(100);
  143. var message = $"How are you {request.Message}? {i}";
  144. await responseStream.WriteAsync(new DataMessage { Message = message });
  145. }
  146. // Gotta look busy
  147. // await Task.Delay(100);
  148. await responseStream.WriteAsync(new DataMessage { Message = $"Goodbye {request.Message}!" });
  149. }
  150. public override async Task EchoAllData(
  151. IAsyncStreamReader<DataMessage> requestStream,
  152. IServerStreamWriter<DataMessage> responseStream,
  153. ServerCallContext context)
  154. {
  155. var flushHeaders = context.RequestHeaders.Any(x => x.Key == "flush-headers");
  156. if (flushHeaders)
  157. {
  158. await context.WriteResponseHeadersAsync(new Metadata());
  159. }
  160. await foreach (var message in requestStream.ReadAllAsync())
  161. {
  162. Console.WriteLine(message.Message);
  163. await responseStream.WriteAsync(new DataMessage
  164. {
  165. Message = message.Message + "aaaa"
  166. }); ;
  167. }
  168. }
  169. }
  170. }