var q0 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(s => new MarketData(0,s));
var q1 = Observable.Interval(TimeSpan.FromSeconds(2)).Select(s => new MarketData(1, s));
var q2 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(s => new MarketData(2, s));
var list012 = new List<IObservable>();
list012.AddRange(new[] { q0.Select(t => t.Qty), q1.Select(t => t.Qty), q2.Select(t => t.Qty) });
list012.CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine("sum012="+d+" "+DateTime.Now ));
var list12 = new List<IObservable>();
list12.AddRange(new[] { q1.Select(t => t.Qty), q2.Select(t => t.Qty) });
list12.CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine("sum12=" + d + " " + DateTime.Now));
Console.ReadLine();
public class MarketData
{
public int? DepthIndex;
public decimal? Qty;
public MarketData(int? d, decimal? q)
{
DepthIndex = d;
Qty = q;
Console.WriteLine("ctor " +d+" "+ q+" "+DateTime.Now );
}
}
public static IObservable<IList<TSource>> CombineLatest<TSource>(this IObservable<IObservable<TSource>> sources)
{
return Observable.Create<IList<TSource>>(
observer =>
{
var gate = new object();
var latest = new List<TSource>();
var hasValueFlags = new List<bool>();
var sourceCount = 0;
var consecutiveActiveSourcesCount = 0;
var outerCompleted = false;
var outerSubscription = new SingleAssignmentDisposable();
var disposables = new CompositeDisposable(outerSubscription);
outerSubscription.Disposable = sources.Subscribe(
source =>
{
int index;
lock (gate)
{
sourceCount++;
index = latest.Count;
latest.Add(default(TSource));
hasValueFlags.Add(false);
}
var subscription = new SingleAssignmentDisposable();
disposables.Add(subscription);
subscription.Disposable = source.Subscribe(
value =>
{
lock (gate)
{
latest[index] = value;
if (consecutiveActiveSourcesCount < hasValueFlags.Count)
{
hasValueFlags[index] = true;
while (consecutiveActiveSourcesCount < hasValueFlags.Count && hasValueFlags[consecutiveActiveSourcesCount])
{
consecutiveActiveSourcesCount++;
}
}
if (consecutiveActiveSourcesCount >= 2)
{
observer.OnNext(latest.Take(consecutiveActiveSourcesCount).ToList().AsReadOnly());
}
}
},
observer.OnError,
() =>
{
bool completeNow;
lock (gate)
{
disposables.Remove(subscription);
sourceCount--;
completeNow = outerCompleted && sourceCount == 0;
}
if (completeNow)
{
observer.OnCompleted();
}
});
},
observer.OnError,
() =>
{
bool completeNow;
lock (gate)
{
outerCompleted = true;
completeNow = sourceCount == 0;
}
if (completeNow)
{
observer.OnCompleted();
}
});
return disposables;
});
}
Sunday, April 19, 2015
Simpler test of IList.CombineLatest without Group by
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment