Saturday, August 25, 2012

WCF Duplex Client as Observer

Setup Rx observer over the wire requires WCF Duplex callback:
(1) ServiceContract need to define IObserverCallback, instead of subscrbe method lamda.
(2) Subscribe and unsubscrib() should be one-way to avoid potential blocking
(3) If Session is required then Subscribe/Unsubscrib need to be initiating/terminating
(4) Note that IO.Subcribe is non-blocking (v.s. deprecated ForEach) and its only purpose is to set up OnNext on Service side which call Client Side OnNext.
(5) Service Instancing Context must be PerSession or Single. PerCall cannot do Unsubscribe since state variable bool stopIO is not maintained. 
On the other hand, Single => Unsubscribe will stop all Sessions.
(6) Threading Issues:  WCF service as Single-Thread by default,So reentry will cause deadlock and therefore disallowed ( e.g calling Callback during Service Operation).
 So mark Service Behavior ConcurrencyMode Reentrant/Mutiple
(7) netTcp Binding is Duplex (its name did not say Dual)
(8) Update Svc Reference on Client will trigger WCF host for WCF Lib project to run for debugging/testing.
(9) Client side Auto Gen Service Reference will force Service Constructor taking in InstanceContext, which in term require Callback contract implemntation.
Service Side uses OperationContext.Current.GetCallbackChannel<IObservserCallback>();
(10) Note that Callback contract name on client is IService1Callback <> IObservserCallback service side Callback Contract Name.
(11) CPU <10% when run test with 1ms data push, no network travel.
(12) To signal Complete on an IO, use TakeWhile and check a non-blocking State Variable like stopIO=true.

Contract and Service Definition

    [ServiceContract(CallbackContract = typeof(IObservserCallback),SessionMode= SessionMode.Required)]
    public interface IService1
    {
        [OperationContract(IsOneWay=true, IsInitiating=true)]
        void Subscribe();
        [OperationContract(IsOneWay = true,IsTerminating=true)]
        void Unsubscribe();
    }

    public interface IObservserCallback
    {
        [OperationContract]
        void OnNext(CompositeType data);
    }

    [DataContract]
    public class CompositeType
    {
        double _value = 30.45;
        string _Name = "Name1";

        [DataMember]
        public double Value
        {
            get { return _value; }
            set { _value = value; }
        }

        [DataMember]
        public string Name
        {
            get { return _Name; }
            set { _Name = value; }
        }
    }


Service

   // [ServiceBehavior(InstanceContextMode=InstanceContextMode.Single)]
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession,ConcurrencyMode=ConcurrencyMode.Reentrant)]
    public class Service1 : IService1
    {
        bool stopIO;  //Session State
        public void Subscribe()
        {
            stopIO = false;
            IObservserCallback client = OperationContext.Current.GetCallbackChannel<IObservserCallback>();
            var IO = Observable.Interval(TimeSpan.FromMilliseconds(1)).TakeWhile(x => { return !stopIO; });
            IO.Subscribe(x =>   // This is non-blocking
            {
              if(client!=null)
                  client.OnNext(new CompositeType() { Name ="Name"+ x.ToString(), Value=(new Random()).NextDouble()*30 });
            });
        }

        public void Unsubscribe()
        {
            stopIO = true;
        }   
    }

Client side Subscription and Observer

    class Program
    {
        static void Main(string[] args)
        {
            ServiceReference1.Service1Client c = new ServiceReference1.Service1Client(new InstanceContext(new callbackObserver()));
            c.Subscribe();
            Console.WriteLine("Wait for Data. To stop IO, hit return");
            Console.ReadLine();
            c.Unsubscribe();
            Console.ReadLine();
        }
    }

    class callbackObserver : ConsoleApplication1.ServiceReference1.IService1Callback
    {
        public void OnNext(CompositeType data)
        {
            Console.WriteLine("Data: " + data.Name +" "+data.Value.ToString());
        }
    }

No comments: