EventSource.cs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Text;
  5. using System.Threading;
  6. namespace Client.SSE
  7. {
  8. public class EventSource
  9. {
  10. public event EventHandler<StateChangedEventArgs> StateChanged;
  11. public event EventHandler<ServerSentEventReceivedEventArgs> EventReceived;
  12. public CancellationTokenSource CancellationToken { get; set; }
  13. private IWebRequesterFactory _webRequesterFactory = new WebRequesterFactory();
  14. private int _timeout = 0;
  15. public Uri Url { get; private set; }
  16. public EventSourceState State { get { return CurrentState.State; } }
  17. public string LastEventId { get; private set; }
  18. private IConnectionState mCurrentState = null;
  19. private CancellationToken mStopToken;
  20. private CancellationTokenSource mTokenSource = new CancellationTokenSource();
  21. private Dictionary<string, string> _headers;
  22. private IConnectionState CurrentState
  23. {
  24. get { return mCurrentState; }
  25. set
  26. {
  27. if (!value.Equals(mCurrentState))
  28. {
  29. StringBuilder sb = new StringBuilder("State changed from ");
  30. sb.Append(mCurrentState == null ? "Unknown" : mCurrentState.State.ToString());
  31. sb.Append(" to ");
  32. sb.Append(value == null ? "Unknown" : value.State.ToString());
  33. Trace.WriteLine(sb.ToString());
  34. mCurrentState = value;
  35. OnStateChanged(mCurrentState.State);
  36. }
  37. }
  38. }
  39. public EventSource(Uri url, int timeout)
  40. {
  41. Initialize(url, timeout);
  42. }
  43. public EventSource(Uri url, Dictionary<string, string> headers, int timeout)
  44. {
  45. _headers = headers;
  46. Initialize(url, timeout);
  47. }
  48. /// <summary>
  49. /// Constructor for testing purposes
  50. /// </summary>
  51. /// <param name="factory">The factory that generates the WebRequester to use.</param>
  52. public EventSource(Uri url, IWebRequesterFactory factory)
  53. {
  54. _webRequesterFactory = factory;
  55. Initialize(url, 0);
  56. }
  57. private void Initialize(Uri url, int timeout)
  58. {
  59. _timeout = timeout;
  60. Url = url;
  61. CurrentState = new DisconnectedState(Url, _webRequesterFactory, _headers);
  62. Trace.WriteLine("SSE EventSource created for " + url.ToString());
  63. }
  64. /// <summary>
  65. /// Start the EventSource.
  66. /// </summary>
  67. /// <param name="stopToken">Cancel this token to stop the EventSource.</param>
  68. public void Start(CancellationToken stopToken)
  69. {
  70. if (State == EventSourceState.CLOSED)
  71. {
  72. mStopToken = stopToken;
  73. mTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken);
  74. Run();
  75. }
  76. }
  77. protected void Run()
  78. {
  79. if (mTokenSource.IsCancellationRequested && CurrentState.State == EventSourceState.CLOSED)
  80. return;
  81. mCurrentState.Run(this.OnEventReceived, mTokenSource.Token, _headers).ContinueWith(cs =>
  82. {
  83. CurrentState = cs.Result;
  84. Run();
  85. });
  86. }
  87. protected void OnEventReceived(ServerSentEvent sse)
  88. {
  89. if (EventReceived != null)
  90. {
  91. EventReceived(this, new ServerSentEventReceivedEventArgs(sse));
  92. }
  93. }
  94. protected void OnStateChanged(EventSourceState newState)
  95. {
  96. if (StateChanged != null)
  97. {
  98. StateChanged(this, new StateChangedEventArgs(newState));
  99. }
  100. }
  101. }
  102. }