123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- using Google.Protobuf;
- using Grpc.Core;
- using GrpcServer;
- using Microsoft.Extensions.Logging;
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Threading.Tasks;
- namespace GrpcServiceServer
- {
- public class DemoerService :Demoer.DemoerBase
- {
- private readonly ILogger<DemoerService> _logger;
- public DemoerService(ILogger<DemoerService> logger)
- {
- _logger = logger;
- }
- /// <summary>
- /// 一元调用
- /// </summary>
- /// <param name="request"></param>
- /// <param name="context"></param>
- /// <returns></returns>
- public override Task<DataMessage> SayHello(DataMessage request, ServerCallContext context)
- {
- Console.WriteLine(request.Message);
- return Task.FromResult(new DataMessage
- {
- Message = "Hello " + request.Message
- });
- }
- private static HashSet<IServerStreamWriter<ChatMessage>> _subscribers = new HashSet<IServerStreamWriter<ChatMessage>>();
- /// <summary>
- /// 双向流式处理调用
- /// </summary>
- /// <param name="requestStream"></param>
- /// <param name="responseStream"></param>
- /// <param name="context"></param>
- /// <returns></returns>
- public override Task Chat(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream, ServerCallContext context)
- {
- return ChatCore(requestStream, responseStream);
- }
- public static async Task ChatCore(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream)
- {
- if (!await requestStream.MoveNext())
- {
- // No messages so don't register and just exit.
- return;
- }
- // Warning, the following is very racy
- _subscribers.Add(responseStream);
- do
- {
- await BroadcastMessageAsync(requestStream.Current);
- } while (await requestStream.MoveNext());
- _subscribers.Remove(responseStream);
- }
- private static async Task BroadcastMessageAsync(ChatMessage message)
- {
- message.Name= message.Name + " Say:";
- foreach (var subscriber in _subscribers)
- {
- await subscriber.WriteAsync(message);
- }
- }
- /// <summary>
- /// 客户端流式处理调用
- /// </summary>
- /// <param name="requestStream"></param>
- /// <param name="context"></param>
- /// <returns></returns>
- public override async Task<DataComplete> ClientStreamedData(
- IAsyncStreamReader<DataMessage> requestStream,
- ServerCallContext context)
- {
- var total = 0L;
- await foreach (var message in requestStream.ReadAllAsync())
- {
- total += message.Data.Length;
- Console.WriteLine(message.Message+" "+ message.Data.ToStringUtf8()+" "+message.ServerDelayMilliseconds);
- if (message.ServerDelayMilliseconds > 0)
- {
- await Task.Delay(message.ServerDelayMilliseconds);
- }
- }
- return new DataComplete
- {
- Size = total
- };
- }
- public override async Task BufferAllData(
- IAsyncStreamReader<DataMessage> requestStream,
- IServerStreamWriter<DataMessage> responseStream,
- ServerCallContext context)
- {
- // Read data into MemoryStream
- var ms = new MemoryStream();
- await foreach (var message in requestStream.ReadAllAsync())
- {
- ms.Write(message.Data.Span);
- _logger.LogInformation($"Received {ms.Length} bytes");
- }
- // Write back to client in batches
- var data = ms.ToArray();
- var sent = 0;
- while (sent < data.Length)
- {
- const int BatchSize = 1024 * 64; // 64 KB
- var writeCount = Math.Min(data.Length - sent, BatchSize);
- await responseStream.WriteAsync(new DataMessage
- {
- Data = ByteString.CopyFrom(data, sent, writeCount)
- });
- sent += writeCount;
- _logger.LogInformation($"Sent {sent} bytes");
- }
- }
- /// <summary>
- /// 服务器流式处理调用
- /// </summary>
- /// <param name="request"></param>
- /// <param name="responseStream"></param>
- /// <param name="context"></param>
- /// <returns></returns>
- public override async Task SayHellos(DataMessage request, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
- {
- // Explicitly send the response headers before any streamed content
- Metadata responseHeaders = new Metadata
- {
- { "test-response-header", "value" }
- };
- await context.WriteResponseHeadersAsync(responseHeaders);
- await SayHellosCore(request, responseStream);
- }
- public static async Task SayHellosCore(DataMessage request, IServerStreamWriter<DataMessage> responseStream)
- {
- for (var i = 0; i < 3; i++)
- {
- // Gotta look busy
- // await Task.Delay(100);
- var message = $"How are you {request.Message}? {i}";
- await responseStream.WriteAsync(new DataMessage { Message = message });
- }
- // Gotta look busy
- // await Task.Delay(100);
- await responseStream.WriteAsync(new DataMessage { Message = $"Goodbye {request.Message}!" });
- }
- public override async Task EchoAllData(
- IAsyncStreamReader<DataMessage> requestStream,
- IServerStreamWriter<DataMessage> responseStream,
- ServerCallContext context)
- {
- var flushHeaders = context.RequestHeaders.Any(x => x.Key == "flush-headers");
- if (flushHeaders)
- {
- await context.WriteResponseHeadersAsync(new Metadata());
- }
- await foreach (var message in requestStream.ReadAllAsync())
- {
- Console.WriteLine(message.Message);
- await responseStream.WriteAsync(new DataMessage
- {
- Message = message.Message + "aaaa"
- }); ;
- }
- }
- }
- }
|