var q0 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(s => new MarketData(0,s)); var q1 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(s => new MarketData(1, s)); var q2 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(s => new MarketData(2, s)); var list012 = new List<IObservable>(); list012.AddRange(new[] { q0.Select(t => t.Qty), q1.Select(t => t.Qty), q2.Select(t => t.Qty) }); list012.CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine("sum012="+d+" "+DateTime.Now )); var list12 = new List<IObservable >(); list12.AddRange(new[] { q1.Select(t => t.Qty), q2.Select(t => t.Qty) }); list12.CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine("sum12=" + d + " " + DateTime.Now)); Console.ReadLine(); public class MarketData { public int? DepthIndex; public decimal? Qty; public MarketData(int? d, decimal? q) { DepthIndex = d; Qty = q; Console.WriteLine("ctor " +d+" "+ q+" "+DateTime.Now ); } } public static IObservable<IList<TSource>> CombineLatest<TSource>(this IObservable<IObservable<TSource>> sources) { return Observable.Create<IList<TSource>>( observer => { var gate = new object(); var latest = new List<TSource>(); var hasValueFlags = new List<bool>(); var sourceCount = 0; var consecutiveActiveSourcesCount = 0; var outerCompleted = false; var outerSubscription = new SingleAssignmentDisposable(); var disposables = new CompositeDisposable(outerSubscription); outerSubscription.Disposable = sources.Subscribe( source => { int index; lock (gate) { sourceCount++; index = latest.Count; latest.Add(default(TSource)); hasValueFlags.Add(false); } var subscription = new SingleAssignmentDisposable(); disposables.Add(subscription); subscription.Disposable = source.Subscribe( value => { lock (gate) { latest[index] = value; if (consecutiveActiveSourcesCount < hasValueFlags.Count) { hasValueFlags[index] = true; while (consecutiveActiveSourcesCount < hasValueFlags.Count && hasValueFlags[consecutiveActiveSourcesCount]) { consecutiveActiveSourcesCount++; } } if (consecutiveActiveSourcesCount >= 2) { observer.OnNext(latest.Take(consecutiveActiveSourcesCount).ToList().AsReadOnly()); } } }, observer.OnError, () => { bool completeNow; lock (gate) { disposables.Remove(subscription); sourceCount--; completeNow = outerCompleted && sourceCount == 0; } if (completeNow) { observer.OnCompleted(); } }); }, observer.OnError, () => { bool completeNow; lock (gate) { outerCompleted = true; completeNow = sourceCount == 0; } if (completeNow) { observer.OnCompleted(); } }); return disposables; }); }
Sunday, April 19, 2015
Simpler test of IList.CombineLatest without Group by
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment