Sunday, April 19, 2015

CombineLastest on Several IObservables


// Source code at http://rxx.codeplex.com/ is the critical part of this solution
       static void Main(string[] args)
        {
            DateTime dtStart = DateTime.Now;
            var mktData = GenerateObservable<MarketData>(1, dt => new MarketData() { DepthIndex=RandomInt(10),Qty=RandomInt(777) }, 1, dt => dt < dtStart.AddSeconds(30));
            var grpByDepthMktData=   mktData.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g));

            grpByDepthMktData.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d=>Console.WriteLine("sum all "+d));

            var mktData1 = mktData.Where(m => m.DepthIndex >= 1);
            var grpByDepthMktData1 = mktData1.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g));

            grpByDepthMktData1.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine(d));

         Console.ReadLine();
}

        static IObservable<T> GenerateObservable<T>(int seconds4Iteration, Func<DateTime, T> newT, int nextInSeconds, Func<DateTime, bool> continuation)
        {
            DateTime dtStart = DateTime.Now;
            return Observable.Generate(dtStart, continuation, dt => dt.AddSeconds(seconds4Iteration), newT, dt => TimeSpan.FromSeconds(nextInSeconds));
        }

       static int? RandomInt(int max)
        {
            
            Random r=new Random();
            return r.Next(max);
        }


    public class MarketData
    {
        public int? DepthIndex;
        public decimal? Qty;
    }

    public class QtyAt
    {
        public int? Depth;
        public IObservable<decimal?> Qtys;
        public QtyAt(int? depth, IObservable<MarketData> qtys)
        {
            Qtys = qtys.Select(m=>m.Qty);
        }
    }


    public static partial class Observable2
    {

        public static IObservable<Tuple<T1, T2>> CombineLatest<T1, T2>(this IObservable<T1> first, IObservable<T2> second)
        {
            return first.CombineLatest(second, Tuple.Create);
        }

        public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObservable<TSource>> sources)
        {
            return Observable.Defer(() =>
            {
                var count = 0;
                var completed = false;

                return sources.MarkLast()
                    .Select(tuple =>
                    {
                        count++;
                        completed = tuple.Item1;

                        return tuple.Item2;
                    })
                    .ToObservable()
                    .CombineLatest()
                    .SkipWhile(list => !completed || list.Count < count);
            });
        }

        internal static IEnumerable<Tuple<bool, TSource>> MarkLast<TSource>(this IEnumerable<TSource> source)
        {

            using (var enumerator = source.GetEnumerator())
            {
                if (enumerator.MoveNext())
                {
                    bool moveNext;

                    do
                    {
                        var value = enumerator.Current;

                        moveNext = enumerator.MoveNext();

                        yield return Tuple.Create(!moveNext, value);
                    }
                    while (moveNext);
                }
            }
        }

          public static IObservable<IList<TSource>> CombineLatest<TSource>(this IObservable<IObservable<TSource>> sources)
  {

   return Observable.Create<IList<TSource>>(
    observer =>
    {
     var gate = new object();

     var latest = new List<TSource>();
     var hasValueFlags = new List<bool>();

     var sourceCount = 0;
     var consecutiveActiveSourcesCount = 0;
     var outerCompleted = false;

     var outerSubscription = new SingleAssignmentDisposable();
     var disposables = new CompositeDisposable(outerSubscription);

     outerSubscription.Disposable = sources.Subscribe(
      source =>
      {
       int index;

       lock (gate)
       {
        sourceCount++;

        index = latest.Count;

        latest.Add(default(TSource));
        hasValueFlags.Add(false);
       }

       var subscription = new SingleAssignmentDisposable();

       disposables.Add(subscription);

       subscription.Disposable = source.Subscribe(
        value =>
        {
         lock (gate)
         {
          latest[index] = value;

          if (consecutiveActiveSourcesCount < hasValueFlags.Count)
          {
           hasValueFlags[index] = true;

           while (consecutiveActiveSourcesCount < hasValueFlags.Count && hasValueFlags[consecutiveActiveSourcesCount])
           {
            consecutiveActiveSourcesCount++;
           }
          }

          if (consecutiveActiveSourcesCount >= 2)
          {
           observer.OnNext(latest.Take(consecutiveActiveSourcesCount).ToList().AsReadOnly());
          }
         }
        },
        observer.OnError,
        () =>
        {
         bool completeNow;

         lock (gate)
         {
          disposables.Remove(subscription);

          sourceCount--;

          completeNow = outerCompleted && sourceCount == 0;
         }

         if (completeNow)
         {
          observer.OnCompleted();
         }
        });
      },
      observer.OnError,
      () =>
      {
       bool completeNow;

       lock (gate)
       {
        outerCompleted = true;

        completeNow = sourceCount == 0;
       }

       if (completeNow)
       {
        observer.OnCompleted();
       }
      });

     return disposables;
    });
  }
 
    }

No comments: