// Source code at http://rxx.codeplex.com/ is the critical part of this solution static void Main(string[] args) { DateTime dtStart = DateTime.Now; var mktData = GenerateObservable<MarketData>(1, dt => new MarketData() { DepthIndex=RandomInt(10),Qty=RandomInt(777) }, 1, dt => dt < dtStart.AddSeconds(30)); var grpByDepthMktData= mktData.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g)); grpByDepthMktData.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d=>Console.WriteLine("sum all "+d)); var mktData1 = mktData.Where(m => m.DepthIndex >= 1); var grpByDepthMktData1 = mktData1.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g)); grpByDepthMktData1.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine(d)); Console.ReadLine(); } static IObservable<T> GenerateObservable<T>(int seconds4Iteration, Func<DateTime, T> newT, int nextInSeconds, Func<DateTime, bool> continuation) { DateTime dtStart = DateTime.Now; return Observable.Generate(dtStart, continuation, dt => dt.AddSeconds(seconds4Iteration), newT, dt => TimeSpan.FromSeconds(nextInSeconds)); } static int? RandomInt(int max) { Random r=new Random(); return r.Next(max); } public class MarketData { public int? DepthIndex; public decimal? Qty; } public class QtyAt { public int? Depth; public IObservable<decimal?> Qtys; public QtyAt(int? depth, IObservable<MarketData> qtys) { Qtys = qtys.Select(m=>m.Qty); } } public static partial class Observable2 { public static IObservable<Tuple<T1, T2>> CombineLatest<T1, T2>(this IObservable<T1> first, IObservable<T2> second) { return first.CombineLatest(second, Tuple.Create); } public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObservable<TSource>> sources) { return Observable.Defer(() => { var count = 0; var completed = false; return sources.MarkLast() .Select(tuple => { count++; completed = tuple.Item1; return tuple.Item2; }) .ToObservable() .CombineLatest() .SkipWhile(list => !completed || list.Count < count); }); } internal static IEnumerable<Tuple<bool, TSource>> MarkLast<TSource>(this IEnumerable<TSource> source) { using (var enumerator = source.GetEnumerator()) { if (enumerator.MoveNext()) { bool moveNext; do { var value = enumerator.Current; moveNext = enumerator.MoveNext(); yield return Tuple.Create(!moveNext, value); } while (moveNext); } } } public static IObservable<IList<TSource>> CombineLatest<TSource>(this IObservable<IObservable<TSource>> sources) { return Observable.Create<IList<TSource>>( observer => { var gate = new object(); var latest = new List<TSource>(); var hasValueFlags = new List<bool>(); var sourceCount = 0; var consecutiveActiveSourcesCount = 0; var outerCompleted = false; var outerSubscription = new SingleAssignmentDisposable(); var disposables = new CompositeDisposable(outerSubscription); outerSubscription.Disposable = sources.Subscribe( source => { int index; lock (gate) { sourceCount++; index = latest.Count; latest.Add(default(TSource)); hasValueFlags.Add(false); } var subscription = new SingleAssignmentDisposable(); disposables.Add(subscription); subscription.Disposable = source.Subscribe( value => { lock (gate) { latest[index] = value; if (consecutiveActiveSourcesCount < hasValueFlags.Count) { hasValueFlags[index] = true; while (consecutiveActiveSourcesCount < hasValueFlags.Count && hasValueFlags[consecutiveActiveSourcesCount]) { consecutiveActiveSourcesCount++; } } if (consecutiveActiveSourcesCount >= 2) { observer.OnNext(latest.Take(consecutiveActiveSourcesCount).ToList().AsReadOnly()); } } }, observer.OnError, () => { bool completeNow; lock (gate) { disposables.Remove(subscription); sourceCount--; completeNow = outerCompleted && sourceCount == 0; } if (completeNow) { observer.OnCompleted(); } }); }, observer.OnError, () => { bool completeNow; lock (gate) { outerCompleted = true; completeNow = sourceCount == 0; } if (completeNow) { observer.OnCompleted(); } }); return disposables; }); } }
Sunday, April 19, 2015
CombineLastest on Several IObservables
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment