Setup Rx observer over the wire requires WCF Duplex callback: (1) ServiceContract need to define IObserverCallback, instead of subscrbe method lamda. (2) Subscribe and unsubscrib() should be one-way to avoid potential blocking (3) If Session is required then Subscribe/Unsubscrib need to be initiating/terminating (4) Note that IO.Subcribe is non-blocking (v.s. deprecated ForEach) and its only purpose is to set up OnNext on Service side which call Client Side OnNext. (5) Service Instancing Context must be PerSession or Single. PerCall cannot do Unsubscribe since state variable bool stopIO is not maintained. On the other hand, Single => Unsubscribe will stop all Sessions. (6) Threading Issues: WCF service as Single-Thread by default,So reentry will cause deadlock and therefore disallowed ( e.g calling Callback during Service Operation). So mark Service Behavior ConcurrencyMode Reentrant/Mutiple (7) netTcp Binding is Duplex (its name did not say Dual) (8) Update Svc Reference on Client will trigger WCF host for WCF Lib project to run for debugging/testing. (9) Client side Auto Gen Service Reference will force Service Constructor taking in InstanceContext, which in term require Callback contract implemntation. Service Side uses OperationContext.Current.GetCallbackChannel<IObservserCallback>(); (10) Note that Callback contract name on client is IService1Callback <> IObservserCallback service side Callback Contract Name. (11) CPU <10% when run test with 1ms data push, no network travel. (12) To signal Complete on an IO, use TakeWhile and check a non-blocking State Variable like stopIO=true. Contract and Service Definition [ServiceContract(CallbackContract = typeof(IObservserCallback),SessionMode= SessionMode.Required)] public interface IService1 { [OperationContract(IsOneWay=true, IsInitiating=true)] void Subscribe(); [OperationContract(IsOneWay = true,IsTerminating=true)] void Unsubscribe(); } public interface IObservserCallback { [OperationContract] void OnNext(CompositeType data); } [DataContract] public class CompositeType { double _value = 30.45; string _Name = "Name1"; [DataMember] public double Value { get { return _value; } set { _value = value; } } [DataMember] public string Name { get { return _Name; } set { _Name = value; } } } Service // [ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)] [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession,ConcurrencyMode=ConcurrencyMode.Reentrant)] public class Service1 : IService1 { bool stopIO; //Session State public void Subscribe() { stopIO = false; IObservserCallback client = OperationContext.Current.GetCallbackChannel<IObservserCallback>(); var IO = Observable.Interval(TimeSpan.FromMilliseconds(1)).TakeWhile(x => { return !stopIO; }); IO.Subscribe(x => // This is non-blocking { if(client!=null) client.OnNext(new CompositeType() { Name ="Name"+ x.ToString(), Value=(new Random()).NextDouble()*30 }); }); } public void Unsubscribe() { stopIO = true; } } Client side Subscription and Observer class Program { static void Main(string[] args) { ServiceReference1.Service1Client c = new ServiceReference1.Service1Client(new InstanceContext(new callbackObserver())); c.Subscribe(); Console.WriteLine("Wait for Data. To stop IO, hit return"); Console.ReadLine(); c.Unsubscribe(); Console.ReadLine(); } } class callbackObserver : ConsoleApplication1.ServiceReference1.IService1Callback { public void OnNext(CompositeType data) { Console.WriteLine("Data: " + data.Name +" "+data.Value.ToString()); } }
Saturday, August 25, 2012
WCF Duplex Client as Observer
Friday, August 17, 2012
Drag Canvas around using Rx
WPF UI can be Drag around using mouseMove stream Buffer of two element calculation of Delta: (1) Observe 3 streams: mouseDown, mouseUp, mouseMove. The first two are point event and need duration to join to mouseMove. (2) The duration is skip until mouseDown take until mouseUp (3) Delta is the difference between two mouseMove sampling inside Buffer(2,2). Note that Buffer(count, skip) skip=count is standard, skip>count skip and skip<count is overlapping. Buffer means pile up multiple point into one tallypublic MainWindow() { InitializeComponent(); var mouseDown = from evt in Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => cv.MouseDown += h, h => cv.MouseDown -= h) select evt.EventArgs.GetPosition(this); var mouseUp = from evt in Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => MouseUp += h, h => MouseUp -= h) select evt.EventArgs.GetPosition(this); var mouseMove = from evt in Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(h => MouseMove += h, h => MouseMove -= h) select evt.EventArgs.GetPosition(this); var q = Observable.Join( mouseDown, mouseMove.Buffer(2, 2), _ => mouseMove.SkipUntil(mouseDown).TakeUntil(mouseUp), _ => Observable.Empty<Unit>(), (d, consecPts) =>new Point(consecPts[1].X - consecPts[0].X, consecPts[1].Y - consecPts[0].Y)); q.Subscribe(delta => { Canvas.SetLeft(cv, Canvas.GetLeft(cv) + delta.X); Canvas.SetTop(cv, Canvas.GetTop(cv) + delta.Y); }); // Side Note: To allow resizing do the following and using Microsoft sample Resizing Adorner: // a = AdornerLayer.GetAdornerLayer(g); //g=Canvas for layout // a.Add(new ResizingAdorner(cv)); // cv= any UIElement inside g } // AdornerLayer adornerLayer; } public class ResizingAdorner : Adorner { Thumb topLeft, topRight, bottomLeft, bottomRight; VisualCollection visualChildren; public ResizingAdorner(UIElement adornedElement) : base(adornedElement) { visualChildren = new VisualCollection(this); BuildAdornerCorner(ref topLeft, Cursors.SizeNWSE); BuildAdornerCorner(ref topRight, Cursors.SizeNESW); BuildAdornerCorner(ref bottomLeft, Cursors.SizeNESW); BuildAdornerCorner(ref bottomRight, Cursors.SizeNWSE); bottomLeft.DragDelta += new DragDeltaEventHandler(HandleBottomLeft); bottomRight.DragDelta += new DragDeltaEventHandler(HandleBottomRight); topLeft.DragDelta += new DragDeltaEventHandler(HandleTopLeft); topRight.DragDelta += new DragDeltaEventHandler(HandleTopRight); } #region Handle Resizing from Thumb void HandleBottomRight(object sender, DragDeltaEventArgs args) { FrameworkElement adornedElement = this.AdornedElement as FrameworkElement; Thumb hitThumb = sender as Thumb; if (adornedElement == null || hitThumb == null) return; FrameworkElement parentElement = adornedElement.Parent as FrameworkElement; EnforceSize(adornedElement); adornedElement.Width = Math.Max(adornedElement.Width + args.HorizontalChange, hitThumb.DesiredSize.Width); adornedElement.Height = Math.Max(args.VerticalChange + adornedElement.Height, hitThumb.DesiredSize.Height); } void HandleBottomLeft(object sender, DragDeltaEventArgs args) { FrameworkElement adornedElement = AdornedElement as FrameworkElement; Thumb hitThumb = sender as Thumb; if (adornedElement == null || hitThumb == null) return; // Ensure that the Width and Height are properly initialized after the resize. EnforceSize(adornedElement); adornedElement.Width = Math.Max(adornedElement.Width - args.HorizontalChange, hitThumb.DesiredSize.Width); adornedElement.Height = Math.Max(args.VerticalChange + adornedElement.Height, hitThumb.DesiredSize.Height); } void HandleTopRight(object sender, DragDeltaEventArgs args) { FrameworkElement adornedElement = this.AdornedElement as FrameworkElement; Thumb hitThumb = sender as Thumb; if (adornedElement == null || hitThumb == null) return; FrameworkElement parentElement = adornedElement.Parent as FrameworkElement; EnforceSize(adornedElement); adornedElement.Width = Math.Max(adornedElement.Width + args.HorizontalChange, hitThumb.DesiredSize.Width); adornedElement.Height = Math.Max(adornedElement.Height - args.VerticalChange, hitThumb.DesiredSize.Height); } void HandleTopLeft(object sender, DragDeltaEventArgs args) { FrameworkElement adornedElement = AdornedElement as FrameworkElement; Thumb hitThumb = sender as Thumb; if (adornedElement == null || hitThumb == null) return; EnforceSize(adornedElement); adornedElement.Width = Math.Max(adornedElement.Width - args.HorizontalChange, hitThumb.DesiredSize.Width); adornedElement.Height = Math.Max(adornedElement.Height - args.VerticalChange, hitThumb.DesiredSize.Height); } #endregion // Arrange thumbs relative to Adorner Center protected override Size ArrangeOverride(Size finalSize) { double desiredWidth = AdornedElement.DesiredSize.Width; double desiredHeight = AdornedElement.DesiredSize.Height; double adornerWidth = this.DesiredSize.Width; double adornerHeight = this.DesiredSize.Height; topLeft.Arrange(new Rect(-adornerWidth / 2, -adornerHeight / 2, adornerWidth, adornerHeight)); topRight.Arrange(new Rect(desiredWidth - adornerWidth / 2, -adornerHeight / 2, adornerWidth, adornerHeight)); bottomLeft.Arrange(new Rect(-adornerWidth / 2, desiredHeight - adornerHeight / 2, adornerWidth, adornerHeight)); bottomRight.Arrange(new Rect(desiredWidth - adornerWidth / 2, desiredHeight - adornerHeight / 2, adornerWidth, adornerHeight)); return finalSize; } void BuildAdornerCorner(ref Thumb cornerThumb, Cursor customizedCursor) { if (cornerThumb != null) return; cornerThumb = new Thumb(); // Set some arbitrary visual characteristics. cornerThumb.Cursor = customizedCursor; cornerThumb.Height = cornerThumb.Width = 10; cornerThumb.Opacity = 0.40; cornerThumb.Background = new SolidColorBrush(Colors.MediumBlue); visualChildren.Add(cornerThumb); // Must add to the tree to arrange } void EnforceSize(FrameworkElement adornedElement) { if (adornedElement.Width.Equals(Double.NaN)) adornedElement.Width = adornedElement.DesiredSize.Width; if (adornedElement.Height.Equals(Double.NaN)) adornedElement.Height = adornedElement.DesiredSize.Height; FrameworkElement parent = adornedElement.Parent as FrameworkElement; if (parent != null) { adornedElement.MaxHeight = parent.ActualHeight; adornedElement.MaxWidth = parent.ActualWidth; } } protected override int VisualChildrenCount { get { return visualChildren.Count; } } protected override Visual GetVisualChild(int index) { return visualChildren[index]; } }
Saturday, August 11, 2012
Dualization
(1) Algebra notation indicates IO is dual of any subject =>func pattern, especially IEnumerable<T> (2) Monad has M, bind, return and in Continuation Monad CM<T> = (T ->() ) -> () return :: T -> CM<T> bind :: CM<T> -> (T -> CM<S>) -> CM<S>Daulality Code interface IEnumerable<out T> { IEnumerator<T> GetEnumerator(); } interface IEnumerator<out T>: IDisposable { bool MoveNext(); T Current { get; } } interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); } interface IObserver<in T> { void OnCompleted(bool done); void OnError(Exception exception); T OnNext { set; } } Covariant vs. Contravariant A <: B A is subtype of B,for TypeContructor C C<A> <: C<B> (Covariant) C<B> <: C<A> (Contravariant) Reading = Covar = <out T> Writing = Contra = <in T> =pass args into array. interface IRequireContravariant<in T> { int Write(T t); } interface IRequireCovariant<out T>{ T Read(int t); } IE vs. IO (1) Concurrency: IE need to remove Concurrency and block MoveNext until Source ready for Next Value; IO need to add Concurrency to not block source push to target (2) Async Computation Model void Fib( int n, ACtion CB, Action Err, Action Cncl) IObserver
Fib(int n); (3) IE can sample source interactively while IO cannot push back high speed source. So IScheduler is define to have time specified: IScheduler { DateTimeOffset Now IDisposable Schedule (state, DateTimeOffset dueTime, Func action) IDisposable Schedule (..TimeSpan dueTime,..); IDisposable Schedule (TState state, Func action) // constant push }
Sunday, August 5, 2012
Prism Region Manager, Sevice Locator and Module
Some code to demo additional features of Prism (1) ServiceLocator.GetInstance = Container.Resolve and inject (2) Region Manager can define region for injecting UserControl (view) (3) Module execute injection of view (4) Adding Module to ModuleCatalog does injection of Region Manager App run bootstrapper public partial class App : Application { protected override void OnStartup(StartupEventArgs e) { OtherBootstrapper b = new OtherBootstrapper(); b.Run(); } } Bootstrapper: Service Locator, ModuleCatalog public class OtherBootstrapper : UnityBootstrapper { protected override void InitializeShell() { (Application.Current.MainWindow = (Window)Shell).Show(); } protected override DependencyObject CreateShell() { //ServiceLocator replaced Container.Resolve return ServiceLocator.Current.GetInstance<ShellWin>(); // return Container.TryResolve<ShellWin>(); } protected override void ConfigureContainer() { // this will run default config such as EventAggregator, ServiceLocator and region manager base.ConfigureContainer(); } protected override void ConfigureModuleCatalog() { // amazingly, add ModuelInfo will do DI just like container.Resolve ModuleCatalog.AddModule(new ModuleInfo() { ModuleName = typeof(OtherFeatureModule).Name, ModuleType = typeof(OtherFeatureModule).AssemblyQualifiedName }); base.ConfigureModuleCatalog(); } } public class OtherFeatureModule : IModule { IRegionManager _mgr; // DI does happend when ModuleInfor Added to Catalog public OtherFeatureModule(IRegionManager mgr) { _mgr = mgr; } public void Initialize() { _mgr.Regions["mainReg"].Add(new uc1 ()); } } ShellWin.xaml --- Region added to RegionManager.Region Collection xmlns:ps="http://www.codeplex.com/CompositeWPF" <Grid> <ContentControl ps:RegionManager.RegionName="mainReg" /> </Grid>
Tuesday, July 31, 2012
Parallel.For vs. Synchronization using ManualResetEvent
For 100 threads Synchronization using primitives seems a bit fater, 14s vs. 17s (but for some reason WCF does not go parallel even after adjusted throtlelling=100 machine.config processModel min/max work/Id thread =100/200) class Program { private const int NUM = 100; static CountdownEvent cde = new CountdownEvent(NUM); static ManualResetEvent mre = new ManualResetEvent(false); static void Main(string[] args) { Stopwatch sw = new Stopwatch(); sw.Start(); PFor(); // SyncWithPrimitives(); sw.Stop(); Console.WriteLine(sw.ElapsedMilliseconds); Console.ReadKey(); } private static void PFor() { c.Open(); Parallel.For(0, NUM, new ParallelOptions() {MaxDegreeOfParallelism=NUM}, (i) => { c.GetData(1111); // Console.WriteLine(c.GetData(1111)); }); c.Close(); } #region Using Threading Primitives private static void SyncWithPrimitives() { Thread[] threads = new Thread[NUM]; for (int i = 0; i < NUM; i++) { threads[i] = new Thread(SetThread); threads[i].Start(); } cde.Wait(); c.Open(); mre.Set(); // all threads burst into calling WCF cde = new CountdownEvent(NUM); // count down to all finish calls to WCF cde.Wait(); c.Close(); } static ServiceReference1.Service1Client c = new ServiceReference1.Service1Client(); static void SetThread() { cde.Signal(); mre.WaitOne(); c.GetData(1111); // Console.WriteLine( c.GetData(1111)); cde.Signal(); // call finished } #endregion } WCF: [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)] public class Service1 : IService1 { public string GetData(int value) { Random r = new Random(); Thread.Sleep(TimeSpan.FromSeconds(1)); return r.NextDouble().ToString(); }
Monday, July 30, 2012
Implement Asyn WCF using Task in .Net 4.0
In Fx 4.5 Task can be returned but in Fx 4.0, task can only be on Server: public interface IService1 { [OperationContract(AsyncPattern=true)] IAsyncResult BeginGetDataAsync(int value, AsyncCallback cb, object state); string EndGetDataAsync(IAsyncResult ar); } string GetData(int value) { Random r = new Random(); Thread.Sleep(TimeSpan.FromSeconds(3)); return r.NextDouble().ToString(); } public IAsyncResult BeginGetDataAsync(int value, AsyncCallback cb, object state) { var t = Task<string>.Factory.StartNew((s) => { return GetData(value); },state); // must pass state into Lambda return t.ContinueWith(res => cb(t)); } public string EndGetDataAsync(IAsyncResult ar) { return ((Task<string>)ar).Result; } Client --- need to add Servicew Ref with support of async WCF checked object state = ""; ServiceReference1.Service1Client c = new ServiceReference1.Service1Client(); int NUM = 10; Stopwatch sw= new Stopwatch(); sw.Start(); Parallel.For(0, NUM, (ls) => { //for (int i = 0; i < NUM; i++) //{ var t = Task<string>.Factory.FromAsync(c.BeginGetDataAsync, c.EndGetDataAsync, 10, state); string s = t.Result; Console.WriteLine(s); //} }); c.Close(); sw.Stop(); Console.WriteLine(sw.ElapsedMilliseconds); Console.ReadLine();
Wednesday, July 25, 2012
High Performance WPF/WCF
Serveral Keys for High Performance: (1) WPF need to use ThreadPool (Task/ParallelFor) to call WCP in the backgroud (2) Return data should be on Queues for Dispatcher Timer to pick up and update UI (3) WCF Instancing.Single and Concurrency.Multiple seems to have better network through-put than Single/Single ( Total B/Sec in Win8 Resource Monitor 220 vs 280, 20%+). But this not a serious performance Testing) (4) one thing for sure: when Task is removed, UI will be slugish.public delegate void OnHeartbeat(); public partial class MainWindow : Window { const int NUM = 5000; public event OnHeartbeat Heartbeat; DispatcherTimer t; DispatcherTimer t2; List<RealtimeData> dataList = new List<RealtimeData>(); Dictionary<int, Queue
> qList = new Dictionary<int, Queue<string>>(); public MainWindow() { InitializeComponent(); t = new DispatcherTimer() { Interval = new TimeSpan(0, 0, 0, 0, 1) }; t.Tick += new EventHandler(t_Tick); t2 = new DispatcherTimer() { Interval = new TimeSpan(0, 0, 0, 0, 1) }; t2.Tick += new EventHandler(t_Tick2); AddGridCell(); t.Start(); t2.Start(); } void t_Tick(object sender, EventArgs e) { if (Heartbeat != null) Heartbeat(); } void t_Tick2(object sender, EventArgs e) { Task.Factory.StartNew(() => { Parallel.For(0, NUM, (ii) => // for (int ii = 0; ii < NUM; ii++) { try { ServiceReference1.Service1Client c = new ServiceReference1.Service1Client(); qList[ii].Enqueue(c.GetData(ii)); c.Close(); } catch (Exception ex) { EventLog.WriteEntry("Application", ex.Message + " " + ex.StackTrace); } }); // } }); } void AddGridCell() { for (int i = 0; i < NUM; i++) { qList.Add(i, new Queue ()); RealtimeData rtD = MakeDataItem(i); this.Heartbeat += rtD.UpdateData; dataList.Add(rtD); wrapPanel1.Children.Add(rtD); } } RealtimeData MakeDataItem(int i) { return new RealtimeData(qList[i]) { Width = 50, Height = 50, BorderBrush = new SolidColorBrush(Colors.Black), BorderThickness = new Thickness(2) }; } } public class RealtimeData : UserControl { Label lb = new Label(); Queue<string> _q; public RealtimeData(Queue<string> q) { _q = q; this.AddChild(lb); } public void UpdateData() { int i = _q.Count; if (_q.Count > 0) lb.Content =i.ToString()+" "+ _q.Dequeue(); } } // [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode=ConcurrencyMode.Multiple)] [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single)] public class Service1 : IService1 { public string GetData(int value) { // Thread.SpinWait(300000); Random r = new Random(); return string.Format(r.NextDouble().ToString()); } }
Subscribe to:
Posts (Atom)