// Source code at http://rxx.codeplex.com/ is the critical part of this solution
static void Main(string[] args)
{
DateTime dtStart = DateTime.Now;
var mktData = GenerateObservable<MarketData>(1, dt => new MarketData() { DepthIndex=RandomInt(10),Qty=RandomInt(777) }, 1, dt => dt < dtStart.AddSeconds(30));
var grpByDepthMktData= mktData.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g));
grpByDepthMktData.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d=>Console.WriteLine("sum all "+d));
var mktData1 = mktData.Where(m => m.DepthIndex >= 1);
var grpByDepthMktData1 = mktData1.GroupBy(k => k.DepthIndex).Select(g => new QtyAt(g.Key, g));
grpByDepthMktData1.Select(d => d.Qtys.StartWith(0)).CombineLatest().Select(t => t.Sum()).Subscribe(d => Console.WriteLine(d));
Console.ReadLine();
}
static IObservable<T> GenerateObservable<T>(int seconds4Iteration, Func<DateTime, T> newT, int nextInSeconds, Func<DateTime, bool> continuation)
{
DateTime dtStart = DateTime.Now;
return Observable.Generate(dtStart, continuation, dt => dt.AddSeconds(seconds4Iteration), newT, dt => TimeSpan.FromSeconds(nextInSeconds));
}
static int? RandomInt(int max)
{
Random r=new Random();
return r.Next(max);
}
public class MarketData
{
public int? DepthIndex;
public decimal? Qty;
}
public class QtyAt
{
public int? Depth;
public IObservable<decimal?> Qtys;
public QtyAt(int? depth, IObservable<MarketData> qtys)
{
Qtys = qtys.Select(m=>m.Qty);
}
}
public static partial class Observable2
{
public static IObservable<Tuple<T1, T2>> CombineLatest<T1, T2>(this IObservable<T1> first, IObservable<T2> second)
{
return first.CombineLatest(second, Tuple.Create);
}
public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObservable<TSource>> sources)
{
return Observable.Defer(() =>
{
var count = 0;
var completed = false;
return sources.MarkLast()
.Select(tuple =>
{
count++;
completed = tuple.Item1;
return tuple.Item2;
})
.ToObservable()
.CombineLatest()
.SkipWhile(list => !completed || list.Count < count);
});
}
internal static IEnumerable<Tuple<bool, TSource>> MarkLast<TSource>(this IEnumerable<TSource> source)
{
using (var enumerator = source.GetEnumerator())
{
if (enumerator.MoveNext())
{
bool moveNext;
do
{
var value = enumerator.Current;
moveNext = enumerator.MoveNext();
yield return Tuple.Create(!moveNext, value);
}
while (moveNext);
}
}
}
public static IObservable<IList<TSource>> CombineLatest<TSource>(this IObservable<IObservable<TSource>> sources)
{
return Observable.Create<IList<TSource>>(
observer =>
{
var gate = new object();
var latest = new List<TSource>();
var hasValueFlags = new List<bool>();
var sourceCount = 0;
var consecutiveActiveSourcesCount = 0;
var outerCompleted = false;
var outerSubscription = new SingleAssignmentDisposable();
var disposables = new CompositeDisposable(outerSubscription);
outerSubscription.Disposable = sources.Subscribe(
source =>
{
int index;
lock (gate)
{
sourceCount++;
index = latest.Count;
latest.Add(default(TSource));
hasValueFlags.Add(false);
}
var subscription = new SingleAssignmentDisposable();
disposables.Add(subscription);
subscription.Disposable = source.Subscribe(
value =>
{
lock (gate)
{
latest[index] = value;
if (consecutiveActiveSourcesCount < hasValueFlags.Count)
{
hasValueFlags[index] = true;
while (consecutiveActiveSourcesCount < hasValueFlags.Count && hasValueFlags[consecutiveActiveSourcesCount])
{
consecutiveActiveSourcesCount++;
}
}
if (consecutiveActiveSourcesCount >= 2)
{
observer.OnNext(latest.Take(consecutiveActiveSourcesCount).ToList().AsReadOnly());
}
}
},
observer.OnError,
() =>
{
bool completeNow;
lock (gate)
{
disposables.Remove(subscription);
sourceCount--;
completeNow = outerCompleted && sourceCount == 0;
}
if (completeNow)
{
observer.OnCompleted();
}
});
},
observer.OnError,
() =>
{
bool completeNow;
lock (gate)
{
outerCompleted = true;
completeNow = sourceCount == 0;
}
if (completeNow)
{
observer.OnCompleted();
}
});
return disposables;
});
}
}
Sunday, April 19, 2015
CombineLastest on Several IObservables
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment