Building MVVM/Rx WPF need to glue together multiple interface and correctly hand off to Dispatcher: (1) (engineering) Need to use nuget.config align with .sln so package managed at solution level <add key="repositorypath" value="Packages" /> (2) (design pattern) View - ViewModel --VMController--Adapter -- Transport (3) (engineering) Fluent API can easily glue DataContext to VM, VMController point to VM public class FluentFactory { public FluentFactory ViewModelController(Func<IViewModelController> viewModelControllerFactory) { var f = viewModelControllerFactory.Invoke(); f.ViewModel = _viewModel; return this; } INotifyPropertyChanged _viewModel=null; public FluentFactory ViewModel(Func<INotifyPropertyChanged> viewModelFactory) { _viewModel = viewModelFactory.Invoke(); return this; } public void View(Func<FrameworkElement> viewFactory) { FrameworkElement fe = viewFactory.Invoke(); fe.DataContext = _viewModel; } } (4) (MVVM) Module can resolve instance to inject into DockingViewManager public class TradingModule : IModule { IUnityContainer _container; public TradingModule(IUnityContainer container) { _container = container; } public void Initialize() { _container.RegisterInstance<ITransport>(new TradingTransport()); _container.RegisterInstance<IAdapter>(new TradingAdapter()); _container.RegisterInstance<IScheduler>(new NewThreadScheduler()); _container.RegisterInstance<LocalScheduler>(DispatcherScheduler.Current); } } (5) (Rx) DockingViewManager will Create VMController when Button Click ask for well known view and then start Observable sequence public class DockingViewManager : IDockingViewManager { public DockingViewManager(ITransport transport, IAdapter adapter, IScheduler scheduler, LocalScheduler dispatcher) public UserControl GetDockingView(WellknowViewName viewName) { FluentFactory f = new FluentFactory(); if (viewName == WellknowViewName.DurationTraderView) { DurationTraderView durView = new DurationTraderView(); f.ViewModel(() => new DurationTraderViewModel()) .ViewModelController(() => CreateDurationTraderViewModelController()) .View(() => durView); return durView; } if (viewName == WellknowViewName._5_10Yr) { _5_10YRView view = new _5_10YRView(); f.ViewModel(() => new _5_10YRViewModel()) .ViewModelController(() => Create_5_10YRViewModelController()) .View(() => view); return view; } return new UserControl(); } public DurationTraderViewModelController CreateDurationTraderViewModelController() { return new DurationTraderViewModelController(_transport,_adapter,_scheduler,_dispatcher); } } public class DurationTraderViewModelController : IViewModelController { public INotifyPropertyChanged ViewModel { get; set; } public DurationTraderViewModelController(ITransport transport, IAdapter adapter, IScheduler scheduler, LocalScheduler dispatcher) { transport.GetTradingObservables() .SubscribeOn(scheduler) .ObserveOn(dispatcher) .Subscribe(fSet => adapter.updater(fSet, ViewModel)); } } public class TradingTransport : ITransport { public IObservable<IFieldDataSet> GetTradingObservables() { return Observable.Create<IFieldDataSet>((obsr) => { return Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Select(i => new FieldDataSet()).Subscribe(obsr); }); } } Side Note--- OnPropertyChange("Cusip") is error prone with Hard-coded string. Better utilize Expression tree in C# 4.0 SetProperty(ref _cusip, value,()=>Cusip); public void SetProperty<T>(ref T field, T value, Expression<Func<T>> exp) { if (EqualityComparer<T>.Default.Equals(field, value)) return; field = value; if (PropertyChanged != null) { MemberExpression me = exp.Body as MemberExpression; if (me != null && me.Member != null) PropertyChanged(this, new PropertyChangedEventArgs(me.Member.Name)); } } Side Note --- Load Module by Config <modules> <module assemblyFile="Modules.dll" moduleType="Modules.TradingModule, Modules, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" moduleName="TradingModule" startupLoaded="true" /> </modules> public class DemoUnityBootstrapper : UnityBootstrapper { protected override IModuleCatalog CreateModuleCatalog() { return new ConfigurationModuleCatalog(); } }
Thursday, July 24, 2014
Practical MVVM/RX WPF App
Sunday, July 6, 2014
Producer Consumer Pattern with HandOff point
// Both Data Queue and Queue under SynchronizationContext allow handoff of data // Data Consumer can be parallel Enumerable or Subscribe to Observables // UI can only singled threaded Async or sync void BlockingCollectionAsQueue() { BlockingCollection<string> ServerData = new BlockingCollection<string>(10000); ServerData.Add("Data"); ServerData.CompleteAdding(); string data1; ServerData.TryTake(out data1, TimeSpan.FromMilliseconds(2)); IEnumerable<string> data = ServerData.GetConsumingEnumerable(); foreach (var v in data.AsParallel().AsOrdered().Select(l => Regex.Replace(l, @"\s+", ""))) ; } void UIMarshalling() { ThreadPool.QueueUserWorkItem((s) => { //do some server side work // UIControl.Invoke(); //Dispatcher.Invoke(); SendOrPostCallback spcb = new SendOrPostCallback((obj) => { }); // SynchronizationContext =WPF.Dispatcher or WF.Control.Invoke SynchronizationContext.Current.Send((state) => { //UI.Property=""; }, ""); }); }http://www.martinpersson.org/wordpress/2012/12/application-launcher/
TPL Fork Join code Pattern
// Count down until all 4 are signaled CountdownEvent ce = new CountdownEvent(4); ce.Signal(); ce.Wait(); // count up by add one for each work ce = new CountdownEvent(1); ce.AddCount(1); ce.Signal(); ce.Wait(); // Inlining -- Recursive Decomposition // walk by directly parallel void Walk2<T>(Tree<T> root, Action<T> action) { if (root == null) return; Parallel.Invoke( ()=> action(root.Data), () => Walk2(root.Left, action), () => Walk2(root.Right, action)); } //Continuation Chaining // Parallel <> Sum of Tasks, Unwrap() map Task<Task> to Task for Completion. if (root == null) return _CompletedTask; Task<Task> t2 = Task.Factory.StartNew(() => Walk(root.Left, action)); Task<Task> t3 = Task.Factory.StartNew(() => Walk(root.Right, action)); return Task.Factory.ContinueWhenAll(new Task[] { t1, t2.Unwrap(), t3.Unwrap() }, task => Task.WaitAll(task)); public static Task _CompletedTask = ((Func<Task>)(() => { TaskCompletionSource<object> ts = new TaskCompletionSource<object>(); ts.SetResult(null); return ts.Task; }))(); //Parallel with options: // option can cacell, set schedule, MaxDegree. // Parallel.For can be stoped by loop.Stop() inside For Parallel.Invoke(opts, () => { }, () => { }, () => { }); CancellationTokenSource ts = new CancellationTokenSource(); ParallelOptions opts = new ParallelOptions() { MaxDegreeOfParallelism = 2, TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), CancellationToken = ts.Token }; ParallelLoopResult res = Parallel.For(0, 10000, () => 1, (j, loop, sub) => { if (loop.ShouldExitCurrentIteration) return 0; loop.Stop();
Testing Observables using NUnit/Moq in MVVM
Mock->setup a function must run for each parameter change. Obs.Create take in IObser and return IDispoable. So next a Subject to call OnNext. Note that Subject.sub(Obr) will allow OnNext to stream out Obs. Finally, testSchedule must advance to see data coming out. Mock_mockT= Mock ; _VMController = new(_mockT.Object, ...); [Test] public void IdChanged() { Id = new Hashset (){guid}; _mockT.Setup(p=>ObserveData(Id) .Return(GenerateObs(id:Id, data: 100.01)); Assert.AreEqual(_VMController.ViewModel.DataCollecton.Count,1); } IObs<PT> GenerateObs(string id="", double? data=null,DateTime logicalDate=null) { Data data = new (...); return Observable.Create(obsr=>{ // Func<IObserver,IDispoable> var s = new Subject<Data>(); s.subscribe(obsr); onNext<Data>(s,data); return s; }); } void onNext<T> (IObserver obr, T data) { obr.OnNext(data); _testSchedule.AdvancedBy(250ms); }
Sunday, June 29, 2014
Use TPL/ TaskCompletionSouce to Implement DownloadAsync
Using Task create non-blocking, TaskCompletionSource SetResult helps send result accross static void Main(string[] args) { Task<string> t = DownloadAsync(); Console.WriteLine("Do some other work while wating for Async Donwload"); Thread.Sleep(3000); Console.WriteLine("other work done"); t.ContinueWith((t1) => { Console.WriteLine(t1.Result); }); // Task.WaitAny(new[] { t }); Wait =Readline to keep program running // Console.WriteLine(t.Result); Console.ReadLine(); } static TaskDownloadAsync() { TaskCompletionSource<string> ts = new TaskCompletionSource<string>(); string i=""; Task.Factory.StartNew(()=> { i = DownloadSync(); }).ContinueWith((t)=> { ts.SetResult(i); }); return ts.Task; } static string DownloadSync() { Thread.Sleep(5000); return "Data"; }
Saturday, June 28, 2014
Convert Property Change to Observable
INPC enable binding V <-> VM and need ICommand to execute request to Server side. It is interesting to see if Observables in Rx running on a TaskPoolScheduler can do the same as Delagete or Relay Command. Both FromEventPattern and CLR event should work public class ViewModel : INotifyPropertyChanged { public string Cusip { get; set; } public double Ask { get;set; } public event PropertyChangedEventHandler PropertyChanged; public IObservable<Tuple<T, string>> OnSubmitChanges<T>(Expression<Func<ViewModel, T>> exp) { MemberExpression me = exp.Body as MemberExpression; string n = me.Member.Name; Func<ViewModel, T> f = exp.Compile(); T i = f.Invoke(this); return Observable.Return(new Tuple<T,string>(i,n)); } public ViewModel() { PropertyChanged += ViewModel_PropertyChanged; // Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(hh => hh.Invoke, h => this.PropertyChanged += h, h => this.PropertyChanged -= h) .Subscribe((e) => { ViewModel_PropertyChanged(this, e.EventArgs); }); } private void ViewModel_PropertyChanged(object sender, PropertyChangedEventArgs e) { // From Event Pattern does not need subscribe again. Direct call Server side is enougth. if (e.PropertyName == "Cusip") OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2 if (e.PropertyName == "Ask") OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); }); } } private static void TestConvertPropChangeToObservable() { ViewModel v = new ViewModel(); v.Cusip = "912828NB2"; v.OnSubmitChanges(vm => vm.Cusip).Subscribe((i) => { Console.WriteLine("Call IVMController->IModObsAdapter->IModObsTransport->QueueAsObserver " + i); }); //459200-10-1 //912828NB2 v.Ask = 98.23; v.OnSubmitChanges(vm => vm.Ask).Subscribe((i) => { Console.WriteLine("Hit bid lift offer " + i); }); }
Sunday, June 22, 2014
F# Async through Agent= inbox as Queue
Agent are mailbox and can run async block, similar to run a async block inside a function. open System open System.Net open Microsoft.FSharp.Control.WebExtensions open System.Diagnostics open System.IO let urlList= ["cnn" , "http://www.cnn.com" "china", "http://www.china.com" ] let fetch( url :string) = let uri = new Uri(url) let c = new WebClient() let html =c.DownloadString(uri) printfn "%s" html let fetchAsync(n: string, url :string) = async { let uri = new Uri(url) let c = new WebClient() let! html =c.AsyncDownloadString(uri) printfn "%s" html } let runAsync() = urlList |> Seq.map fetchAsync |> Async.Parallel |> Async.RunSynchronously let runSync() = "http://www.cnn.com" |> fetch let s= new StreamWriter("c:\working\1.txt",true) s.AutoFlush <-true let agent = MailboxProcessor.Start( fun inbox -> async { while true do let! msg =inbox.Receive() // fetch(msg) s.WriteLine(DateTime.Now.ToString()+ " "+msg); }) [] let main arg = runSync() fetch("http://www.cnn.com") runAsync() |> ignore agent.Post("http://www.cnn.com") agent.Post("http://www.china.com") agent.Post("http://www.goolge.com") agent.Post("http://www.fb.com") let mutable b=true let mutable n=1 Console.ReadLine() |> ignore while b do n<-n+1 if n>1000 then b <-false agent.Post(n.ToString()) Console.ReadLine() |> ignore s.Close() 0
Subscribe to:
Posts (Atom)