Sunday, July 6, 2014

Producer Consumer Pattern with HandOff point

// Both Data Queue and Queue under SynchronizationContext allow handoff of data
// Data Consumer can be parallel Enumerable or Subscribe to Observables
// UI can only singled threaded Async or sync

        void BlockingCollectionAsQueue()
        {
            BlockingCollection<string> ServerData = new BlockingCollection<string>(10000);
            ServerData.Add("Data");
            ServerData.CompleteAdding();
            string data1;
            ServerData.TryTake(out data1, TimeSpan.FromMilliseconds(2));
            IEnumerable<string> data = ServerData.GetConsumingEnumerable();
            foreach (var v in data.AsParallel().AsOrdered().Select(l => Regex.Replace(l, @"\s+", ""))) ;
        }

        void UIMarshalling()
        {
            ThreadPool.QueueUserWorkItem((s) =>
            {
                //do some server side work
                // UIControl.Invoke();
                //Dispatcher.Invoke();
                SendOrPostCallback spcb = new SendOrPostCallback((obj) => { });
                // SynchronizationContext =WPF.Dispatcher or WF.Control.Invoke
                SynchronizationContext.Current.Send((state) =>
                {
                    //UI.Property="";
                }, "");
            });
        }
http://www.martinpersson.org/wordpress/2012/12/application-launcher/

TPL Fork Join code Pattern

// Count down until all 4 are signaled
     CountdownEvent ce = new CountdownEvent(4);
            ce.Signal();
            ce.Wait();

// count up by add one for each work
     ce = new CountdownEvent(1);
     ce.AddCount(1);
     ce.Signal();
     ce.Wait();

// Inlining -- Recursive Decomposition
 // walk by directly parallel
        void Walk2<T>(Tree<T> root, Action<T> action)
        {
            if (root == null) return;
            Parallel.Invoke(
                ()=> action(root.Data),
                () => Walk2(root.Left, action),
                () => Walk2(root.Right, action));
        }

//Continuation Chaining
// Parallel <> Sum of Tasks, Unwrap() map Task<Task> to Task for Completion.
            if (root == null) return _CompletedTask;
            Task<Task> t2 = Task.Factory.StartNew(() => Walk(root.Left, action));
            Task<Task> t3 = Task.Factory.StartNew(() => Walk(root.Right, action));
            return Task.Factory.ContinueWhenAll(new Task[] { t1, t2.Unwrap(), t3.Unwrap() },
             task => Task.WaitAll(task));
public static Task _CompletedTask = ((Func<Task>)(() =>
        {
            TaskCompletionSource<object> ts = new TaskCompletionSource<object>();
            ts.SetResult(null);
            return ts.Task;
        }))();

//Parallel with options:
// option can cacell, set schedule, MaxDegree.
// Parallel.For can be stoped by loop.Stop() inside For
Parallel.Invoke(opts, () => { }, () => { }, () => { });
            CancellationTokenSource ts = new CancellationTokenSource();
            
            ParallelOptions opts = new ParallelOptions()
            {
                MaxDegreeOfParallelism = 2,
                TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
                CancellationToken = ts.Token
            };

 ParallelLoopResult res = Parallel.For(0, 10000, () => 1, (j, loop, sub) =>
            {
                if (loop.ShouldExitCurrentIteration) return 0;
                loop.Stop();

Testing Observables using NUnit/Moq in MVVM

Mock->setup a function must run for each parameter change.
Obs.Create take in IObser and return IDispoable. So next a Subject to call OnNext.
Note that Subject.sub(Obr) will allow OnNext to stream out Obs.
Finally, testSchedule must advance to see data coming out.

Mock _mockT= Mock;
_VMController = new(_mockT.Object, ...);
[Test] public void IdChanged() {
   Id = new Hashset(){guid};
   _mockT.Setup(p=>ObserveData(Id)
   .Return(GenerateObs(id:Id, data: 100.01));
   Assert.AreEqual(_VMController.ViewModel.DataCollecton.Count,1);
}
IObs<PT> GenerateObs(string id="", double? data=null,DateTime logicalDate=null) {
Data data = new (...);
return Observable.Create(obsr=>{   // Func<IObserver,IDispoable>
var s = new Subject<Data>();
   s.subscribe(obsr);
   onNext<Data>(s,data);
return s;
});
}
void onNext<T> (IObserver obr, T data)
{
  obr.OnNext(data);
  _testSchedule.AdvancedBy(250ms);
}

Sunday, June 29, 2014

Use TPL/ TaskCompletionSouce to Implement DownloadAsync

Using Task create non-blocking, TaskCompletionSource SetResult helps send result accross
        static void Main(string[] args)
        {
            Task<string> t = DownloadAsync();
            Console.WriteLine("Do some other work while wating for Async Donwload");
            Thread.Sleep(3000);
            Console.WriteLine("other work done");
            t.ContinueWith((t1) => { Console.WriteLine(t1.Result); });
            // Task.WaitAny(new[] { t }); Wait =Readline to keep program running
           // Console.WriteLine(t.Result);
            Console.ReadLine();
        }

        static Task DownloadAsync()
        {
            TaskCompletionSource<string> ts = new TaskCompletionSource<string>();
            string i="";
            Task.Factory.StartNew(()=> { i = DownloadSync(); }).ContinueWith((t)=>
            { ts.SetResult(i); });
            return ts.Task;

        }
        
        static string DownloadSync()
        {
            Thread.Sleep(5000);
            return "Data";
        }

Saturday, June 28, 2014

Convert Property Change to Observable

INPC enable binding V <-> VM and need ICommand to execute request to Server side. It is interesting to see if Observables in Rx running on
a TaskPoolScheduler can do the same as Delagete or Relay Command. Both FromEventPattern and CLR event should work

    public class ViewModel : INotifyPropertyChanged
    {
        public string Cusip { get; set; }
        public double Ask { get;set; }

        public event PropertyChangedEventHandler PropertyChanged;

        public IObservable<Tuple<T, string>> OnSubmitChanges<T>(Expression<Func<ViewModel, T>> exp)
        {
            MemberExpression me = exp.Body as MemberExpression;
            string n = me.Member.Name;
            Func<ViewModel, T> f = exp.Compile();
            T i = f.Invoke(this);
            return Observable.Return(new Tuple<T,string>(i,n));
        }

        public ViewModel()
        {
            PropertyChanged += ViewModel_PropertyChanged;
//  Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(hh => hh.Invoke, h => this.PropertyChanged += h, h => this.PropertyChanged -= h)
                .Subscribe((e) => { ViewModel_PropertyChanged(this, e.EventArgs); });
        }

        private void ViewModel_PropertyChanged(object sender, PropertyChangedEventArgs e)
        {
// From Event Pattern does not need subscribe again. Direct call Server side is enougth.
            if (e.PropertyName == "Cusip")
                OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2
            if (e.PropertyName == "Ask")
                OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); });

        }
    }


        private static void TestConvertPropChangeToObservable()
        {
            ViewModel v = new ViewModel();
            v.Cusip = "912828NB2";
            v.OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2
            v.Ask = 98.23;
            v.OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); }); 

        }

Sunday, June 22, 2014

F# Async through Agent= inbox as Queue

Agent are mailbox and can run async block, similar to run a async block inside a function.




open System
open System.Net 
open Microsoft.FSharp.Control.WebExtensions 
open System.Diagnostics 
open System.IO 


let urlList= ["cnn" , "http://www.cnn.com"
              "china", "http://www.china.com"
             ]


let fetch( url :string) = 
        let uri = new Uri(url)
        let c = new WebClient()
        let html =c.DownloadString(uri)
        printfn "%s" html

let fetchAsync(n: string, url :string) = 
    async {
        let uri = new Uri(url)
        let c = new WebClient()
        let! html =c.AsyncDownloadString(uri)
        printfn "%s" html
    }

let runAsync() =
    urlList 
    |> Seq.map fetchAsync
    |> Async.Parallel 
    |> Async.RunSynchronously 

let runSync() = 
    "http://www.cnn.com"
    |> fetch


let s= new StreamWriter("c:\working\1.txt",true)
s.AutoFlush <-true
let agent =
    MailboxProcessor.Start( fun inbox ->
     async {    while true do
                let! msg =inbox.Receive()
               // fetch(msg)
                s.WriteLine(DateTime.Now.ToString()+ " "+msg);
                })


[]
let main arg =
    runSync()
    fetch("http://www.cnn.com")
    
    runAsync() |> ignore
 
    agent.Post("http://www.cnn.com")
    agent.Post("http://www.china.com")
    agent.Post("http://www.goolge.com")
    agent.Post("http://www.fb.com")
    let mutable b=true
    let mutable n=1
    Console.ReadLine() |> ignore
    while b do
        n<-n+1
        if n>1000 then
            b <-false
        agent.Post(n.ToString())

    Console.ReadLine() |> ignore
    s.Close()
    0

    



Tuesday, April 22, 2014

Generate Observables


    class Program
    {
        static void Main(string[] args)
        {
            DateTime dtStart = DateTime.Now;
            var txns = GenerateObservable<Transaction>(3, dt => new Transaction() { id = "txn " + dt.ToString() }, 3, dt => dt < dtStart.AddSeconds(120));
            var bsds = GenerateObservable<Transaction>(1, dt => new BondStaticData() { id = "bsd " + dt.ToString() }, 3, dt => dt < dtStart.AddSeconds(10));

            txns.CombineLatest(bsds,(t,b)=> t.id+" "+b.id).Subscribe(s=>Console.WriteLine(s));

            Console.ReadLine();
        }

        static IObservable GenerateObservable<T>(int seconds4Iteration, Func<DateTime,T> newT, int nextInSeconds,Func<DateTime,bool> continuation)
        {
            DateTime dtStart = DateTime.Now;
            return Observable.Generate(dtStart, continuation, dt => dt.AddSeconds(seconds4Iteration), newT, dt => TimeSpan.FromSeconds(nextInSeconds));
        }
    }