using System; using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading; namespace Client.SSE { public class EventSource { public event EventHandler StateChanged; public event EventHandler EventReceived; public CancellationTokenSource CancellationToken { get; set; } private IWebRequesterFactory _webRequesterFactory = new WebRequesterFactory(); private int _timeout = 0; public Uri Url { get; private set; } public EventSourceState State { get { return CurrentState.State; } } public string LastEventId { get; private set; } private IConnectionState mCurrentState = null; private CancellationToken mStopToken; private CancellationTokenSource mTokenSource = new CancellationTokenSource(); private Dictionary _headers; private IConnectionState CurrentState { get { return mCurrentState; } set { if (!value.Equals(mCurrentState)) { StringBuilder sb = new StringBuilder("State changed from "); sb.Append(mCurrentState == null ? "Unknown" : mCurrentState.State.ToString()); sb.Append(" to "); sb.Append(value == null ? "Unknown" : value.State.ToString()); Trace.WriteLine(sb.ToString()); mCurrentState = value; OnStateChanged(mCurrentState.State); } } } public EventSource(Uri url, int timeout) { Initialize(url, timeout); } public EventSource(Uri url, Dictionary headers, int timeout) { _headers = headers; Initialize(url, timeout); } /// /// Constructor for testing purposes /// /// The factory that generates the WebRequester to use. public EventSource(Uri url, IWebRequesterFactory factory) { _webRequesterFactory = factory; Initialize(url, 0); } private void Initialize(Uri url, int timeout) { _timeout = timeout; Url = url; CurrentState = new DisconnectedState(Url, _webRequesterFactory, _headers); Trace.WriteLine("SSE EventSource created for " + url.ToString()); } /// /// Start the EventSource. /// /// Cancel this token to stop the EventSource. public void Start(CancellationToken stopToken) { if (State == EventSourceState.CLOSED) { mStopToken = stopToken; mTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken); Run(); } } protected void Run() { if (mTokenSource.IsCancellationRequested && CurrentState.State == EventSourceState.CLOSED) return; mCurrentState.Run(this.OnEventReceived, mTokenSource.Token, _headers).ContinueWith(cs => { CurrentState = cs.Result; Run(); }); } protected void OnEventReceived(ServerSentEvent sse) { if (EventReceived != null) { EventReceived(this, new ServerSentEventReceivedEventArgs(sse)); } } protected void OnStateChanged(EventSourceState newState) { if (StateChanged != null) { StateChanged(this, new StateChangedEventArgs(newState)); } } } }