Sunday, April 19, 2015

Simpler test of IList.CombineLatest without Group by

            var q0 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(s => new MarketData(0,s));
            var q1 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(s => new MarketData(1, s));
            var q2 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(s => new MarketData(2, s));


             var list012 = new List<IObservable>();
             list012.AddRange(new[] { q0.Select(t => t.Qty), q1.Select(t => t.Qty), q2.Select(t => t.Qty) });
            list012.CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine("sum012="+d+" "+DateTime.Now ));

            var list12 = new List<IObservable>();
            list12.AddRange(new[] { q1.Select(t => t.Qty), q2.Select(t => t.Qty) });
            list12.CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine("sum12=" + d + " " + DateTime.Now));
            Console.ReadLine();

    public class MarketData
    {
        public int? DepthIndex;
        public decimal? Qty;

        public MarketData(int? d, decimal? q)
        {
            DepthIndex = d;
            Qty = q;
            Console.WriteLine("ctor " +d+" "+ q+" "+DateTime.Now );
        }
    }

        public static IObservable<IList<TSource>> CombineLatest<TSource>(this IObservable<IObservable<TSource>> sources)
  {

   return Observable.Create<IList<TSource>>(
    observer =>
    {
     var gate = new object();

     var latest = new List<TSource>();
     var hasValueFlags = new List<bool>();

     var sourceCount = 0;
     var consecutiveActiveSourcesCount = 0;
     var outerCompleted = false;

     var outerSubscription = new SingleAssignmentDisposable();
     var disposables = new CompositeDisposable(outerSubscription);

     outerSubscription.Disposable = sources.Subscribe(
      source =>
      {
       int index;

       lock (gate)
       {
        sourceCount++;

        index = latest.Count;

        latest.Add(default(TSource));
        hasValueFlags.Add(false);
       }

       var subscription = new SingleAssignmentDisposable();

       disposables.Add(subscription);

       subscription.Disposable = source.Subscribe(
        value =>
        {
         lock (gate)
         {
          latest[index] = value;

          if (consecutiveActiveSourcesCount < hasValueFlags.Count)
          {
           hasValueFlags[index] = true;

           while (consecutiveActiveSourcesCount < hasValueFlags.Count && hasValueFlags[consecutiveActiveSourcesCount])
           {
            consecutiveActiveSourcesCount++;
           }
          }

          if (consecutiveActiveSourcesCount >= 2)
          {
           observer.OnNext(latest.Take(consecutiveActiveSourcesCount).ToList().AsReadOnly());
          }
         }
        },
        observer.OnError,
        () =>
        {
         bool completeNow;

         lock (gate)
         {
          disposables.Remove(subscription);

          sourceCount--;

          completeNow = outerCompleted && sourceCount == 0;
         }

         if (completeNow)
         {
          observer.OnCompleted();
         }
        });
      },
      observer.OnError,
      () =>
      {
       bool completeNow;

       lock (gate)
       {
        outerCompleted = true;

        completeNow = sourceCount == 0;
       }

       if (completeNow)
       {
        observer.OnCompleted();
       }
      });

     return disposables;
    });
  }
 

No comments: