123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Text;
- using System.Threading;
- namespace Client.SSE
- {
- public class EventSource
- {
- public event EventHandler<StateChangedEventArgs> StateChanged;
- public event EventHandler<ServerSentEventReceivedEventArgs> EventReceived;
- public CancellationTokenSource CancellationToken { get; set; }
- private IWebRequesterFactory _webRequesterFactory = new WebRequesterFactory();
- private int _timeout = 0;
- public Uri Url { get; private set; }
- public EventSourceState State { get { return CurrentState.State; } }
- public string LastEventId { get; private set; }
- private IConnectionState mCurrentState = null;
- private CancellationToken mStopToken;
- private CancellationTokenSource mTokenSource = new CancellationTokenSource();
- private Dictionary<string, string> _headers;
- private IConnectionState CurrentState
- {
- get { return mCurrentState; }
- set
- {
- if (!value.Equals(mCurrentState))
- {
- StringBuilder sb = new StringBuilder("State changed from ");
- sb.Append(mCurrentState == null ? "Unknown" : mCurrentState.State.ToString());
- sb.Append(" to ");
- sb.Append(value == null ? "Unknown" : value.State.ToString());
- Trace.WriteLine(sb.ToString());
- mCurrentState = value;
- OnStateChanged(mCurrentState.State);
- }
- }
- }
- public EventSource(Uri url, int timeout)
- {
- Initialize(url, timeout);
- }
- public EventSource(Uri url, Dictionary<string, string> headers, int timeout)
- {
- _headers = headers;
- Initialize(url, timeout);
- }
- /// <summary>
- /// Constructor for testing purposes
- /// </summary>
- /// <param name="factory">The factory that generates the WebRequester to use.</param>
- public EventSource(Uri url, IWebRequesterFactory factory)
- {
- _webRequesterFactory = factory;
- Initialize(url, 0);
- }
- private void Initialize(Uri url, int timeout)
- {
- _timeout = timeout;
- Url = url;
- CurrentState = new DisconnectedState(Url, _webRequesterFactory, _headers);
- Trace.WriteLine("SSE EventSource created for " + url.ToString());
- }
- /// <summary>
- /// Start the EventSource.
- /// </summary>
- /// <param name="stopToken">Cancel this token to stop the EventSource.</param>
- public void Start(CancellationToken stopToken)
- {
- if (State == EventSourceState.CLOSED)
- {
- mStopToken = stopToken;
- mTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken);
- Run();
- }
- }
- protected void Run()
- {
- if (mTokenSource.IsCancellationRequested && CurrentState.State == EventSourceState.CLOSED)
- return;
- mCurrentState.Run(this.OnEventReceived, mTokenSource.Token, _headers).ContinueWith(cs =>
- {
- CurrentState = cs.Result;
- Run();
- });
- }
- protected void OnEventReceived(ServerSentEvent sse)
- {
- if (EventReceived != null)
- {
- EventReceived(this, new ServerSentEventReceivedEventArgs(sse));
- }
- }
- protected void OnStateChanged(EventSourceState newState)
- {
- if (StateChanged != null)
- {
- StateChanged(this, new StateChangedEventArgs(newState));
- }
- }
- }
- }
|