ConnectedState.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Net;
  6. using System.IO;
  7. using System.Threading.Tasks;
  8. using System.Threading;
  9. using System.Diagnostics;
  10. namespace Client.SSE
  11. {
  12. public class ConnectedState : IConnectionState
  13. {
  14. private IWebRequesterFactory mWebRequesterFactory;
  15. private ServerSentEvent mSse = null;
  16. private string mRemainingText = string.Empty; // the text that is not ended with a lineending char is saved for next call.
  17. private IServerResponse mResponse;
  18. private Dictionary<string, string> headers;
  19. public EventSourceState State { get { return EventSourceState.OPEN; } }
  20. public ConnectedState(IServerResponse response, IWebRequesterFactory webRequesterFactory, Dictionary<string, string> headers)
  21. {
  22. mResponse = response;
  23. mWebRequesterFactory = webRequesterFactory;
  24. this.headers = headers;
  25. }
  26. public Task<IConnectionState> Run(Action<ServerSentEvent> msgReceived, CancellationToken cancelToken, Dictionary<string, string> headers)
  27. {
  28. Task<IConnectionState> t = new Task<IConnectionState>(() =>
  29. {
  30. //using (mResponse)
  31. {
  32. //using (var stream = mResponse.GetResponseStream())
  33. var stream = mResponse.GetResponseStream();
  34. {
  35. byte[] buffer = new byte[1024 * 8];
  36. var taskRead = stream.ReadAsync(buffer, 0, buffer.Length, cancelToken);
  37. try
  38. {
  39. taskRead.Wait(cancelToken);
  40. }
  41. catch (Exception ex)
  42. {
  43. Trace.WriteLine(ex, "ConnectedState.Run");
  44. }
  45. if (!cancelToken.IsCancellationRequested)
  46. {
  47. int bytesRead = taskRead.Result;
  48. if (bytesRead > 0) // stream has not reached the end yet
  49. {
  50. //Console.WriteLine("ReadCallback {0} bytesRead", bytesRead);
  51. string text = Encoding.UTF8.GetString(buffer, 0, bytesRead);
  52. text = mRemainingText + text;
  53. string[] lines = StringSplitter.SplitIntoLines(text, out mRemainingText);
  54. foreach (string line in lines)
  55. {
  56. if (cancelToken.IsCancellationRequested) break;
  57. // Dispatch message if empty lne
  58. if (string.IsNullOrEmpty(line.Trim()) && mSse != null)
  59. {
  60. Trace.WriteLine("SSE Message received");
  61. msgReceived(mSse);
  62. mSse = null;
  63. }
  64. else if (line.StartsWith(":"))
  65. {
  66. // This a comment, just log it.
  67. Trace.WriteLine("SSE A comment was received: " + line);
  68. }
  69. else
  70. {
  71. string fieldName = String.Empty;
  72. string fieldValue = String.Empty;
  73. if (line.Contains(':'))
  74. {
  75. int index = line.IndexOf(':');
  76. fieldName = line.Substring(0, index);
  77. fieldValue = line.Substring(index + 1).TrimStart();
  78. }
  79. else
  80. fieldName = line;
  81. if (String.Compare(fieldName, "event", true) == 0)
  82. {
  83. mSse = mSse ?? new ServerSentEvent();
  84. mSse.EventType = fieldValue;
  85. }
  86. else if (String.Compare(fieldName, "data", true) == 0)
  87. {
  88. mSse = mSse ?? new ServerSentEvent();
  89. mSse.Data = fieldValue + '\n';
  90. }
  91. else if (String.Compare(fieldName, "id", true) == 0)
  92. {
  93. mSse = mSse ?? new ServerSentEvent();
  94. mSse.LastEventId = fieldValue;
  95. }
  96. else if (String.Compare(fieldName, "retry", true) == 0)
  97. {
  98. int parsedRetry;
  99. if (int.TryParse(fieldValue, out parsedRetry))
  100. {
  101. mSse = mSse ?? new ServerSentEvent();
  102. mSse.Retry = parsedRetry;
  103. }
  104. }
  105. else
  106. {
  107. // Ignore this, just log it
  108. Trace.WriteLine("SSE A unknown line was received: " + line);
  109. }
  110. }
  111. }
  112. if (!cancelToken.IsCancellationRequested)
  113. return this;
  114. }
  115. else // end of the stream reached
  116. {
  117. Trace.WriteLine("SSE No bytes read. End of stream.");
  118. }
  119. }
  120. //stream.Dispose()
  121. //stream.Close();
  122. //mResponse.Close();
  123. //mResponse.Dispose();
  124. return new DisconnectedState(mResponse.ResponseUri, mWebRequesterFactory, headers);
  125. }
  126. }
  127. });
  128. t.Start();
  129. return t;
  130. }
  131. }
  132. }