123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Net;
- using System.IO;
- using System.Threading.Tasks;
- using System.Threading;
- using System.Diagnostics;
- namespace Client.SSE
- {
- public class ConnectedState : IConnectionState
- {
- private IWebRequesterFactory mWebRequesterFactory;
- private ServerSentEvent mSse = null;
- private string mRemainingText = string.Empty; // the text that is not ended with a lineending char is saved for next call.
- private IServerResponse mResponse;
- private Dictionary<string, string> headers;
- public EventSourceState State { get { return EventSourceState.OPEN; } }
- public ConnectedState(IServerResponse response, IWebRequesterFactory webRequesterFactory, Dictionary<string, string> headers)
- {
- mResponse = response;
- mWebRequesterFactory = webRequesterFactory;
- this.headers = headers;
- }
- public Task<IConnectionState> Run(Action<ServerSentEvent> msgReceived, CancellationToken cancelToken, Dictionary<string, string> headers)
- {
- Task<IConnectionState> t = new Task<IConnectionState>(() =>
- {
- //using (mResponse)
- {
- //using (var stream = mResponse.GetResponseStream())
- var stream = mResponse.GetResponseStream();
- {
- byte[] buffer = new byte[1024 * 8];
- var taskRead = stream.ReadAsync(buffer, 0, buffer.Length, cancelToken);
- try
- {
- taskRead.Wait(cancelToken);
- }
- catch (Exception ex)
- {
- Trace.WriteLine(ex, "ConnectedState.Run");
- }
- if (!cancelToken.IsCancellationRequested)
- {
- int bytesRead = taskRead.Result;
- if (bytesRead > 0) // stream has not reached the end yet
- {
- //Console.WriteLine("ReadCallback {0} bytesRead", bytesRead);
- string text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
- text = mRemainingText + text;
- string[] lines = StringSplitter.SplitIntoLines(text, out mRemainingText);
- foreach (string line in lines)
- {
- if (cancelToken.IsCancellationRequested) break;
- // Dispatch message if empty lne
- if (string.IsNullOrEmpty(line.Trim()) && mSse != null)
- {
- Trace.WriteLine("SSE Message received");
- msgReceived(mSse);
- mSse = null;
- }
- else if (line.StartsWith(":"))
- {
- // This a comment, just log it.
- Trace.WriteLine("SSE A comment was received: " + line);
- }
- else
- {
- string fieldName = String.Empty;
- string fieldValue = String.Empty;
- if (line.Contains(':'))
- {
- int index = line.IndexOf(':');
- fieldName = line.Substring(0, index);
- fieldValue = line.Substring(index + 1).TrimStart();
- }
- else
- fieldName = line;
- if (String.Compare(fieldName, "event", true) == 0)
- {
- mSse = mSse ?? new ServerSentEvent();
- mSse.EventType = fieldValue;
- }
- else if (String.Compare(fieldName, "data", true) == 0)
- {
- mSse = mSse ?? new ServerSentEvent();
- mSse.Data = fieldValue + '\n';
- }
- else if (String.Compare(fieldName, "id", true) == 0)
- {
- mSse = mSse ?? new ServerSentEvent();
- mSse.LastEventId = fieldValue;
- }
- else if (String.Compare(fieldName, "retry", true) == 0)
- {
- int parsedRetry;
- if (int.TryParse(fieldValue, out parsedRetry))
- {
- mSse = mSse ?? new ServerSentEvent();
- mSse.Retry = parsedRetry;
- }
- }
- else
- {
- // Ignore this, just log it
- Trace.WriteLine("SSE A unknown line was received: " + line);
- }
- }
- }
- if (!cancelToken.IsCancellationRequested)
- return this;
- }
- else // end of the stream reached
- {
- Trace.WriteLine("SSE No bytes read. End of stream.");
- }
- }
- //stream.Dispose()
- //stream.Close();
- //mResponse.Close();
- //mResponse.Dispose();
- return new DisconnectedState(mResponse.ResponseUri, mWebRequesterFactory, headers);
- }
- }
- });
- t.Start();
- return t;
- }
- }
- }
|