// Source code at http://rxx.codeplex.com/ is the critical part of this solution
       static void Main(string[] args)
        {
            DateTime dtStart = DateTime.Now;
            var mktData = GenerateObservable<MarketData>(1, dt => new MarketData() { DepthIndex=RandomInt(10),Qty=RandomInt(777) }, 1, dt => dt < dtStart.AddSeconds(30));
            var grpByDepthMktData=   mktData.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g));
            grpByDepthMktData.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d=>Console.WriteLine("sum all "+d));
            var mktData1 = mktData.Where(m => m.DepthIndex >= 1);
            var grpByDepthMktData1 = mktData1.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g));
            grpByDepthMktData1.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine(d));
         Console.ReadLine();
}
        static IObservable<T> GenerateObservable<T>(int seconds4Iteration, Func<DateTime, T> newT, int nextInSeconds, Func<DateTime, bool> continuation)
        {
            DateTime dtStart = DateTime.Now;
            return Observable.Generate(dtStart, continuation, dt => dt.AddSeconds(seconds4Iteration), newT, dt => TimeSpan.FromSeconds(nextInSeconds));
        }
       static int? RandomInt(int max)
        {
            
            Random r=new Random();
            return r.Next(max);
        }
    public class MarketData
    {
        public int? DepthIndex;
        public decimal? Qty;
    }
    public class QtyAt
    {
        public int? Depth;
        public IObservable<decimal?> Qtys;
        public QtyAt(int? depth, IObservable<MarketData> qtys)
        {
            Qtys = qtys.Select(m=>m.Qty);
        }
    }
    public static partial class Observable2
    {
        public static IObservable<Tuple<T1, T2>> CombineLatest<T1, T2>(this IObservable<T1> first, IObservable<T2> second)
        {
            return first.CombineLatest(second, Tuple.Create);
        }
        public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObservable<TSource>> sources)
        {
            return Observable.Defer(() =>
            {
                var count = 0;
                var completed = false;
                return sources.MarkLast()
                    .Select(tuple =>
                    {
                        count++;
                        completed = tuple.Item1;
                        return tuple.Item2;
                    })
                    .ToObservable()
                    .CombineLatest()
                    .SkipWhile(list => !completed || list.Count < count);
            });
        }
        internal static IEnumerable<Tuple<bool, TSource>> MarkLast<TSource>(this IEnumerable<TSource> source)
        {
            using (var enumerator = source.GetEnumerator())
            {
                if (enumerator.MoveNext())
                {
                    bool moveNext;
                    do
                    {
                        var value = enumerator.Current;
                        moveNext = enumerator.MoveNext();
                        yield return Tuple.Create(!moveNext, value);
                    }
                    while (moveNext);
                }
            }
        }
          public static IObservable<IList<TSource>> CombineLatest<TSource>(this IObservable<IObservable<TSource>> sources)
  {
   return Observable.Create<IList<TSource>>(
    observer =>
    {
     var gate = new object();
     var latest = new List<TSource>();
     var hasValueFlags = new List<bool>();
     var sourceCount = 0;
     var consecutiveActiveSourcesCount = 0;
     var outerCompleted = false;
     var outerSubscription = new SingleAssignmentDisposable();
     var disposables = new CompositeDisposable(outerSubscription);
     outerSubscription.Disposable = sources.Subscribe(
      source =>
      {
       int index;
       lock (gate)
       {
        sourceCount++;
        index = latest.Count;
        latest.Add(default(TSource));
        hasValueFlags.Add(false);
       }
       var subscription = new SingleAssignmentDisposable();
       disposables.Add(subscription);
       subscription.Disposable = source.Subscribe(
        value =>
        {
         lock (gate)
         {
          latest[index] = value;
          if (consecutiveActiveSourcesCount < hasValueFlags.Count)
          {
           hasValueFlags[index] = true;
           while (consecutiveActiveSourcesCount < hasValueFlags.Count && hasValueFlags[consecutiveActiveSourcesCount])
           {
            consecutiveActiveSourcesCount++;
           }
          }
          if (consecutiveActiveSourcesCount >= 2)
          {
           observer.OnNext(latest.Take(consecutiveActiveSourcesCount).ToList().AsReadOnly());
          }
         }
        },
        observer.OnError,
        () =>
        {
         bool completeNow;
         lock (gate)
         {
          disposables.Remove(subscription);
          sourceCount--;
          completeNow = outerCompleted && sourceCount == 0;
         }
         if (completeNow)
         {
          observer.OnCompleted();
         }
        });
      },
      observer.OnError,
      () =>
      {
       bool completeNow;
       lock (gate)
       {
        outerCompleted = true;
        completeNow = sourceCount == 0;
       }
       if (completeNow)
       {
        observer.OnCompleted();
       }
      });
     return disposables;
    });
  }
 
    }
Sunday, April 19, 2015
CombineLastest on Several IObservables
Subscribe to:
Post Comments (Atom)
 
No comments:
Post a Comment