Thursday, July 24, 2014

Practical MVVM/RX WPF App


Building MVVM/Rx WPF need to glue together multiple interface and correctly hand off to Dispatcher:

(1) (engineering) Need to use nuget.config align with .sln so package managed at solution level 
          <add key="repositorypath" value="Packages" />

(2) (design pattern) View - ViewModel --VMController--Adapter -- Transport

(3) (engineering) Fluent API can easily glue DataContext to VM, VMController point to VM

    public class FluentFactory
    {
        public FluentFactory ViewModelController(Func<IViewModelController> viewModelControllerFactory)
        {
            var f = viewModelControllerFactory.Invoke();
            f.ViewModel = _viewModel;
            return this;
        }

        INotifyPropertyChanged _viewModel=null;
        public FluentFactory ViewModel(Func<INotifyPropertyChanged> viewModelFactory)
        {
            _viewModel = viewModelFactory.Invoke();
            return this;
        }

        public void View(Func<FrameworkElement> viewFactory)
        {
            FrameworkElement fe = viewFactory.Invoke();
            fe.DataContext = _viewModel;
        }
    }

(4) (MVVM) Module can resolve instance to inject into DockingViewManager
    public class TradingModule : IModule
    {
        IUnityContainer _container;
        public TradingModule(IUnityContainer container)
        {
            _container = container;
        }
        public void Initialize()
        {
            _container.RegisterInstance<ITransport>(new TradingTransport());
            _container.RegisterInstance<IAdapter>(new TradingAdapter());
            _container.RegisterInstance<IScheduler>(new NewThreadScheduler());
            _container.RegisterInstance<LocalScheduler>(DispatcherScheduler.Current);
        }
    }

(5) (Rx) DockingViewManager will Create VMController when Button Click ask for well known view and then start Observable sequence

    public class DockingViewManager : IDockingViewManager
    {
        public DockingViewManager(ITransport transport, IAdapter adapter, IScheduler scheduler, LocalScheduler dispatcher)

        public UserControl GetDockingView(WellknowViewName viewName)
        {
            FluentFactory f = new FluentFactory();
            if (viewName == WellknowViewName.DurationTraderView)
            {
                DurationTraderView durView = new DurationTraderView();

                f.ViewModel(() => new DurationTraderViewModel())
                .ViewModelController(() => CreateDurationTraderViewModelController())
                .View(() => durView);
                return durView;
            }

            if (viewName == WellknowViewName._5_10Yr)
            {
                _5_10YRView view = new _5_10YRView();

                f.ViewModel(() => new _5_10YRViewModel())
                .ViewModelController(() => Create_5_10YRViewModelController())
                .View(() => view);
                return view;
            }
            return new UserControl();
        }

        public DurationTraderViewModelController CreateDurationTraderViewModelController()
        {
            return new DurationTraderViewModelController(_transport,_adapter,_scheduler,_dispatcher);
        }
    }


    public class DurationTraderViewModelController : IViewModelController
    {
        public INotifyPropertyChanged ViewModel { get; set; }

        public DurationTraderViewModelController(ITransport transport, IAdapter adapter, IScheduler scheduler, LocalScheduler dispatcher)
        {
            transport.GetTradingObservables()
                .SubscribeOn(scheduler)
                .ObserveOn(dispatcher)
                .Subscribe(fSet => adapter.updater(fSet, ViewModel));
        }
    }

    public class TradingTransport : ITransport
    {
        public IObservable<IFieldDataSet> GetTradingObservables()
        {
            return Observable.Create<IFieldDataSet>((obsr) =>
            {
                return Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Select(i => new FieldDataSet()).Subscribe(obsr);
            });
        }
    }

Side Note--- OnPropertyChange("Cusip") is error prone with Hard-coded string. Better utilize Expression tree in C# 4.0
     
        SetProperty(ref _cusip, value,()=>Cusip);

        public void SetProperty<T>(ref T field, T value, Expression<Func<T>> exp)
        {
            if (EqualityComparer<T>.Default.Equals(field, value)) return;
            field = value;
            if (PropertyChanged != null)
            {
                MemberExpression me = exp.Body as MemberExpression;
                if (me != null && me.Member != null)
                    PropertyChanged(this, new PropertyChangedEventArgs(me.Member.Name));
            }
        }

Side Note --- Load Module by Config
  <modules>
    <module assemblyFile="Modules.dll" 
            moduleType="Modules.TradingModule, Modules, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"
            moduleName="TradingModule" startupLoaded="true" />
  </modules>

    public class DemoUnityBootstrapper : UnityBootstrapper
    {
        protected override IModuleCatalog CreateModuleCatalog()
        {
            return new ConfigurationModuleCatalog();
        }
    }

Sunday, July 6, 2014

Producer Consumer Pattern with HandOff point

// Both Data Queue and Queue under SynchronizationContext allow handoff of data
// Data Consumer can be parallel Enumerable or Subscribe to Observables
// UI can only singled threaded Async or sync

        void BlockingCollectionAsQueue()
        {
            BlockingCollection<string> ServerData = new BlockingCollection<string>(10000);
            ServerData.Add("Data");
            ServerData.CompleteAdding();
            string data1;
            ServerData.TryTake(out data1, TimeSpan.FromMilliseconds(2));
            IEnumerable<string> data = ServerData.GetConsumingEnumerable();
            foreach (var v in data.AsParallel().AsOrdered().Select(l => Regex.Replace(l, @"\s+", ""))) ;
        }

        void UIMarshalling()
        {
            ThreadPool.QueueUserWorkItem((s) =>
            {
                //do some server side work
                // UIControl.Invoke();
                //Dispatcher.Invoke();
                SendOrPostCallback spcb = new SendOrPostCallback((obj) => { });
                // SynchronizationContext =WPF.Dispatcher or WF.Control.Invoke
                SynchronizationContext.Current.Send((state) =>
                {
                    //UI.Property="";
                }, "");
            });
        }
http://www.martinpersson.org/wordpress/2012/12/application-launcher/

TPL Fork Join code Pattern

// Count down until all 4 are signaled
     CountdownEvent ce = new CountdownEvent(4);
            ce.Signal();
            ce.Wait();

// count up by add one for each work
     ce = new CountdownEvent(1);
     ce.AddCount(1);
     ce.Signal();
     ce.Wait();

// Inlining -- Recursive Decomposition
 // walk by directly parallel
        void Walk2<T>(Tree<T> root, Action<T> action)
        {
            if (root == null) return;
            Parallel.Invoke(
                ()=> action(root.Data),
                () => Walk2(root.Left, action),
                () => Walk2(root.Right, action));
        }

//Continuation Chaining
// Parallel <> Sum of Tasks, Unwrap() map Task<Task> to Task for Completion.
            if (root == null) return _CompletedTask;
            Task<Task> t2 = Task.Factory.StartNew(() => Walk(root.Left, action));
            Task<Task> t3 = Task.Factory.StartNew(() => Walk(root.Right, action));
            return Task.Factory.ContinueWhenAll(new Task[] { t1, t2.Unwrap(), t3.Unwrap() },
             task => Task.WaitAll(task));
public static Task _CompletedTask = ((Func<Task>)(() =>
        {
            TaskCompletionSource<object> ts = new TaskCompletionSource<object>();
            ts.SetResult(null);
            return ts.Task;
        }))();

//Parallel with options:
// option can cacell, set schedule, MaxDegree.
// Parallel.For can be stoped by loop.Stop() inside For
Parallel.Invoke(opts, () => { }, () => { }, () => { });
            CancellationTokenSource ts = new CancellationTokenSource();
            
            ParallelOptions opts = new ParallelOptions()
            {
                MaxDegreeOfParallelism = 2,
                TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
                CancellationToken = ts.Token
            };

 ParallelLoopResult res = Parallel.For(0, 10000, () => 1, (j, loop, sub) =>
            {
                if (loop.ShouldExitCurrentIteration) return 0;
                loop.Stop();

Testing Observables using NUnit/Moq in MVVM

Mock->setup a function must run for each parameter change.
Obs.Create take in IObser and return IDispoable. So next a Subject to call OnNext.
Note that Subject.sub(Obr) will allow OnNext to stream out Obs.
Finally, testSchedule must advance to see data coming out.

Mock _mockT= Mock;
_VMController = new(_mockT.Object, ...);
[Test] public void IdChanged() {
   Id = new Hashset(){guid};
   _mockT.Setup(p=>ObserveData(Id)
   .Return(GenerateObs(id:Id, data: 100.01));
   Assert.AreEqual(_VMController.ViewModel.DataCollecton.Count,1);
}
IObs<PT> GenerateObs(string id="", double? data=null,DateTime logicalDate=null) {
Data data = new (...);
return Observable.Create(obsr=>{   // Func<IObserver,IDispoable>
var s = new Subject<Data>();
   s.subscribe(obsr);
   onNext<Data>(s,data);
return s;
});
}
void onNext<T> (IObserver obr, T data)
{
  obr.OnNext(data);
  _testSchedule.AdvancedBy(250ms);
}

Sunday, June 29, 2014

Use TPL/ TaskCompletionSouce to Implement DownloadAsync

Using Task create non-blocking, TaskCompletionSource SetResult helps send result accross
        static void Main(string[] args)
        {
            Task<string> t = DownloadAsync();
            Console.WriteLine("Do some other work while wating for Async Donwload");
            Thread.Sleep(3000);
            Console.WriteLine("other work done");
            t.ContinueWith((t1) => { Console.WriteLine(t1.Result); });
            // Task.WaitAny(new[] { t }); Wait =Readline to keep program running
           // Console.WriteLine(t.Result);
            Console.ReadLine();
        }

        static Task DownloadAsync()
        {
            TaskCompletionSource<string> ts = new TaskCompletionSource<string>();
            string i="";
            Task.Factory.StartNew(()=> { i = DownloadSync(); }).ContinueWith((t)=>
            { ts.SetResult(i); });
            return ts.Task;

        }
        
        static string DownloadSync()
        {
            Thread.Sleep(5000);
            return "Data";
        }

Saturday, June 28, 2014

Convert Property Change to Observable

INPC enable binding V <-> VM and need ICommand to execute request to Server side. It is interesting to see if Observables in Rx running on
a TaskPoolScheduler can do the same as Delagete or Relay Command. Both FromEventPattern and CLR event should work

    public class ViewModel : INotifyPropertyChanged
    {
        public string Cusip { get; set; }
        public double Ask { get;set; }

        public event PropertyChangedEventHandler PropertyChanged;

        public IObservable<Tuple<T, string>> OnSubmitChanges<T>(Expression<Func<ViewModel, T>> exp)
        {
            MemberExpression me = exp.Body as MemberExpression;
            string n = me.Member.Name;
            Func<ViewModel, T> f = exp.Compile();
            T i = f.Invoke(this);
            return Observable.Return(new Tuple<T,string>(i,n));
        }

        public ViewModel()
        {
            PropertyChanged += ViewModel_PropertyChanged;
//  Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(hh => hh.Invoke, h => this.PropertyChanged += h, h => this.PropertyChanged -= h)
                .Subscribe((e) => { ViewModel_PropertyChanged(this, e.EventArgs); });
        }

        private void ViewModel_PropertyChanged(object sender, PropertyChangedEventArgs e)
        {
// From Event Pattern does not need subscribe again. Direct call Server side is enougth.
            if (e.PropertyName == "Cusip")
                OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2
            if (e.PropertyName == "Ask")
                OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); });

        }
    }


        private static void TestConvertPropChangeToObservable()
        {
            ViewModel v = new ViewModel();
            v.Cusip = "912828NB2";
            v.OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2
            v.Ask = 98.23;
            v.OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); }); 

        }

Sunday, June 22, 2014

F# Async through Agent= inbox as Queue

Agent are mailbox and can run async block, similar to run a async block inside a function.




open System
open System.Net 
open Microsoft.FSharp.Control.WebExtensions 
open System.Diagnostics 
open System.IO 


let urlList= ["cnn" , "http://www.cnn.com"
              "china", "http://www.china.com"
             ]


let fetch( url :string) = 
        let uri = new Uri(url)
        let c = new WebClient()
        let html =c.DownloadString(uri)
        printfn "%s" html

let fetchAsync(n: string, url :string) = 
    async {
        let uri = new Uri(url)
        let c = new WebClient()
        let! html =c.AsyncDownloadString(uri)
        printfn "%s" html
    }

let runAsync() =
    urlList 
    |> Seq.map fetchAsync
    |> Async.Parallel 
    |> Async.RunSynchronously 

let runSync() = 
    "http://www.cnn.com"
    |> fetch


let s= new StreamWriter("c:\working\1.txt",true)
s.AutoFlush <-true
let agent =
    MailboxProcessor.Start( fun inbox ->
     async {    while true do
                let! msg =inbox.Receive()
               // fetch(msg)
                s.WriteLine(DateTime.Now.ToString()+ " "+msg);
                })


[]
let main arg =
    runSync()
    fetch("http://www.cnn.com")
    
    runAsync() |> ignore
 
    agent.Post("http://www.cnn.com")
    agent.Post("http://www.china.com")
    agent.Post("http://www.goolge.com")
    agent.Post("http://www.fb.com")
    let mutable b=true
    let mutable n=1
    Console.ReadLine() |> ignore
    while b do
        n<-n+1
        if n>1000 then
            b <-false
        agent.Post(n.ToString())

    Console.ReadLine() |> ignore
    s.Close()
    0