// Both Data Queue and Queue under SynchronizationContext allow handoff of data // Data Consumer can be parallel Enumerable or Subscribe to Observables // UI can only singled threaded Async or sync void BlockingCollectionAsQueue() { BlockingCollection<string> ServerData = new BlockingCollection<string>(10000); ServerData.Add("Data"); ServerData.CompleteAdding(); string data1; ServerData.TryTake(out data1, TimeSpan.FromMilliseconds(2)); IEnumerable<string> data = ServerData.GetConsumingEnumerable(); foreach (var v in data.AsParallel().AsOrdered().Select(l => Regex.Replace(l, @"\s+", ""))) ; } void UIMarshalling() { ThreadPool.QueueUserWorkItem((s) => { //do some server side work // UIControl.Invoke(); //Dispatcher.Invoke(); SendOrPostCallback spcb = new SendOrPostCallback((obj) => { }); // SynchronizationContext =WPF.Dispatcher or WF.Control.Invoke SynchronizationContext.Current.Send((state) => { //UI.Property=""; }, ""); }); }http://www.martinpersson.org/wordpress/2012/12/application-launcher/
Sunday, July 6, 2014
Producer Consumer Pattern with HandOff point
TPL Fork Join code Pattern
// Count down until all 4 are signaled CountdownEvent ce = new CountdownEvent(4); ce.Signal(); ce.Wait(); // count up by add one for each work ce = new CountdownEvent(1); ce.AddCount(1); ce.Signal(); ce.Wait(); // Inlining -- Recursive Decomposition // walk by directly parallel void Walk2<T>(Tree<T> root, Action<T> action) { if (root == null) return; Parallel.Invoke( ()=> action(root.Data), () => Walk2(root.Left, action), () => Walk2(root.Right, action)); } //Continuation Chaining // Parallel <> Sum of Tasks, Unwrap() map Task<Task> to Task for Completion. if (root == null) return _CompletedTask; Task<Task> t2 = Task.Factory.StartNew(() => Walk(root.Left, action)); Task<Task> t3 = Task.Factory.StartNew(() => Walk(root.Right, action)); return Task.Factory.ContinueWhenAll(new Task[] { t1, t2.Unwrap(), t3.Unwrap() }, task => Task.WaitAll(task)); public static Task _CompletedTask = ((Func<Task>)(() => { TaskCompletionSource<object> ts = new TaskCompletionSource<object>(); ts.SetResult(null); return ts.Task; }))(); //Parallel with options: // option can cacell, set schedule, MaxDegree. // Parallel.For can be stoped by loop.Stop() inside For Parallel.Invoke(opts, () => { }, () => { }, () => { }); CancellationTokenSource ts = new CancellationTokenSource(); ParallelOptions opts = new ParallelOptions() { MaxDegreeOfParallelism = 2, TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), CancellationToken = ts.Token }; ParallelLoopResult res = Parallel.For(0, 10000, () => 1, (j, loop, sub) => { if (loop.ShouldExitCurrentIteration) return 0; loop.Stop();
Testing Observables using NUnit/Moq in MVVM
Mock->setup a function must run for each parameter change. Obs.Create take in IObser and return IDispoable. So next a Subject to call OnNext. Note that Subject.sub(Obr) will allow OnNext to stream out Obs. Finally, testSchedule must advance to see data coming out. Mock_mockT= Mock ; _VMController = new(_mockT.Object, ...); [Test] public void IdChanged() { Id = new Hashset (){guid}; _mockT.Setup(p=>ObserveData(Id) .Return(GenerateObs(id:Id, data: 100.01)); Assert.AreEqual(_VMController.ViewModel.DataCollecton.Count,1); } IObs<PT> GenerateObs(string id="", double? data=null,DateTime logicalDate=null) { Data data = new (...); return Observable.Create(obsr=>{ // Func<IObserver,IDispoable> var s = new Subject<Data>(); s.subscribe(obsr); onNext<Data>(s,data); return s; }); } void onNext<T> (IObserver obr, T data) { obr.OnNext(data); _testSchedule.AdvancedBy(250ms); }
Sunday, June 29, 2014
Use TPL/ TaskCompletionSouce to Implement DownloadAsync
Using Task create non-blocking, TaskCompletionSource SetResult helps send result accross static void Main(string[] args) { Task<string> t = DownloadAsync(); Console.WriteLine("Do some other work while wating for Async Donwload"); Thread.Sleep(3000); Console.WriteLine("other work done"); t.ContinueWith((t1) => { Console.WriteLine(t1.Result); }); // Task.WaitAny(new[] { t }); Wait =Readline to keep program running // Console.WriteLine(t.Result); Console.ReadLine(); } static TaskDownloadAsync() { TaskCompletionSource<string> ts = new TaskCompletionSource<string>(); string i=""; Task.Factory.StartNew(()=> { i = DownloadSync(); }).ContinueWith((t)=> { ts.SetResult(i); }); return ts.Task; } static string DownloadSync() { Thread.Sleep(5000); return "Data"; }
Saturday, June 28, 2014
Convert Property Change to Observable
INPC enable binding V <-> VM and need ICommand to execute request to Server side. It is interesting to see if Observables in Rx running on a TaskPoolScheduler can do the same as Delagete or Relay Command. Both FromEventPattern and CLR event should work public class ViewModel : INotifyPropertyChanged { public string Cusip { get; set; } public double Ask { get;set; } public event PropertyChangedEventHandler PropertyChanged; public IObservable<Tuple<T, string>> OnSubmitChanges<T>(Expression<Func<ViewModel, T>> exp) { MemberExpression me = exp.Body as MemberExpression; string n = me.Member.Name; Func<ViewModel, T> f = exp.Compile(); T i = f.Invoke(this); return Observable.Return(new Tuple<T,string>(i,n)); } public ViewModel() { PropertyChanged += ViewModel_PropertyChanged; // Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(hh => hh.Invoke, h => this.PropertyChanged += h, h => this.PropertyChanged -= h) .Subscribe((e) => { ViewModel_PropertyChanged(this, e.EventArgs); }); } private void ViewModel_PropertyChanged(object sender, PropertyChangedEventArgs e) { // From Event Pattern does not need subscribe again. Direct call Server side is enougth. if (e.PropertyName == "Cusip") OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2 if (e.PropertyName == "Ask") OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); }); } } private static void TestConvertPropChangeToObservable() { ViewModel v = new ViewModel(); v.Cusip = "912828NB2"; v.OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2 v.Ask = 98.23; v.OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); }); }
Sunday, June 22, 2014
F# Async through Agent= inbox as Queue
Agent are mailbox and can run async block, similar to run a async block inside a function. open System open System.Net open Microsoft.FSharp.Control.WebExtensions open System.Diagnostics open System.IO let urlList= ["cnn" , "http://www.cnn.com" "china", "http://www.china.com" ] let fetch( url :string) = let uri = new Uri(url) let c = new WebClient() let html =c.DownloadString(uri) printfn "%s" html let fetchAsync(n: string, url :string) = async { let uri = new Uri(url) let c = new WebClient() let! html =c.AsyncDownloadString(uri) printfn "%s" html } let runAsync() = urlList |> Seq.map fetchAsync |> Async.Parallel |> Async.RunSynchronously let runSync() = "http://www.cnn.com" |> fetch let s= new StreamWriter("c:\working\1.txt",true) s.AutoFlush <-true let agent = MailboxProcessor.Start( fun inbox -> async { while true do let! msg =inbox.Receive() // fetch(msg) s.WriteLine(DateTime.Now.ToString()+ " "+msg); }) [] let main arg = runSync() fetch("http://www.cnn.com") runAsync() |> ignore agent.Post("http://www.cnn.com") agent.Post("http://www.china.com") agent.Post("http://www.goolge.com") agent.Post("http://www.fb.com") let mutable b=true let mutable n=1 Console.ReadLine() |> ignore while b do n<-n+1 if n>1000 then b <-false agent.Post(n.ToString()) Console.ReadLine() |> ignore s.Close() 0
Tuesday, April 22, 2014
Generate Observables
class Program { static void Main(string[] args) { DateTime dtStart = DateTime.Now; var txns = GenerateObservable<Transaction>(3, dt => new Transaction() { id = "txn " + dt.ToString() }, 3, dt => dt < dtStart.AddSeconds(120)); var bsds = GenerateObservable<Transaction>(1, dt => new BondStaticData() { id = "bsd " + dt.ToString() }, 3, dt => dt < dtStart.AddSeconds(10)); txns.CombineLatest(bsds,(t,b)=> t.id+" "+b.id).Subscribe(s=>Console.WriteLine(s)); Console.ReadLine(); } static IObservable GenerateObservable<T>(int seconds4Iteration, Func<DateTime,T> newT, int nextInSeconds,Func<DateTime,bool> continuation) { DateTime dtStart = DateTime.Now; return Observable.Generate(dtStart, continuation, dt => dt.AddSeconds(seconds4Iteration), newT, dt => TimeSpan.FromSeconds(nextInSeconds)); } }
Subscribe to:
Posts (Atom)