Montag, März 30, 2009

WCF-PubSub Experiment

PubSub (Publish-Subscribe) kann man sehr gut dort einsetzen, wo eine beliebige Anzahl von Empfängern von Nachrichten existiert. Ab und zu wird PubSub auch als Observer-Pattern bezeichnet. Durch PubSub sollen alle die Interessiert sind, die Informationen der Veröffentlichung (Publish) erhalten. Der Vorgang der Veröffentlichung ist als asynchroner Mechanismus konzipiert, es kann allerdings auch synchron benutzt werden (wird aber nicht empfohlen). Aufgrund der asynchronen Ausführung ist der Einsatz einer Message Queue in vielen Anwendungsszenarien empfehlenswert. Eine Message Queue bringen viele Windows Server Systeme bereits mit (Msmq), so dass meistens keine weiteren Kosten anfallen, alternativ kann auch eine Datenbank verwendet werden.

Binding, die Duplex-Verbindungen unterstützen, sind:

  • WSDualHttpBinding
  • NetTcpBinding
  • NetNamedPipeBinding
  • NetPeerTcpBinding
  • NetTcpContextBinding

Auf der MSDN-Seite ist eine sehr gute Übersicht über die gebräuchlichsten Bindings und deren Fähigkeiten.

Mein Beispiel, hier der Download, ist nicht sehr sinnvoll aber es zeigt die grundsätzlichen Funktionen und einige Problemecken.

Ich habe ein sehr recht einfaches Klassenmodell, es werden simple mathematische Operationen in den Services durchgeführt. Es gibt einen Service zum Addieren und einen zum Multiplizieren. Es gibt 4 Service-Interfaces IMathAddService, IMathMultiplyService, ISubscriptionService und IPublishService. Die ersten beiden Services klammere ich aktuell etwas aus, entscheidend ist der ISubscriptionService bei dem sich alle Aufrufer (Subscriber) registrieren, um über Zustandsänderung/Veröffentlichungen informiert zu werden. In meinem Fall implementieren sowohl die Subscriber als auch der SubscriptionService das IPublishService-Interface. Es wäre nicht notwendig, dass der SubscriptionService das Interface implementiert, es macht es in meinem Fall aber einfacher. Durch die Publish-Methode im SubscriptionService werden die Veröffentlichungen angestoßen. Der SubscriptionService führt Veröffentlichungen allerdings nicht sofort aus, sonder stellt diese in eine Queue, da ich immer schön nacheinander eine Abarbeitung durchführen möchte. Die Queue ist im SubscriberManager mit implementiert, typischerweise würde sich hier ein MessageQueuing-System anbieten.PubSub-Demo class diagrammEs passiert nicht allzu viel in der Anwendung. Es wird zuerst die Endpoint-Konfiguration erstellt und das Binding gesetzt. Ich verwende NetTcpBinding um mit den verschiedenen Services zu kommunizieren. Im nächsten Schritt (Zeilen 12-16) werden die Instanzen der Subscriber-Channel erstellt, diese bekommen als Informationen den InstanceContext, der auf die Instanz des Callback zeigt. Nach der Erstellung der Hosts für meine 3 Services (Zeilen 18-29) werden die beiden Subscriber am Service registriert (Zeilen .31-34). Innerhalb des Using-Blocks wird die eigentlich Abarbeitung durchgeführt. Der Monitor ist meine Arbeitsklasse für die veröffentlichten Nachrichten. Ich wollte auch etwas die Konkurrenzproblematik bei meiner Anwendung zeigen, dazu rufe wird 2 mal hintereinander eine Veröffentlichung ausgeführt und sofort danach die eine weitere Rechenoperation gestartet. Die Mathe-Services speichern immer das letzte Ergebnis und setzen es bei einem Publish-Aufurf auf NaN (Not a Number).

   1: //setup endpoints
   2: EndpointAddress addressAdd = 
   3:     new EndpointAddress("net.tcp://localhost:5554/pubsub/add");
   4: EndpointAddress addressMultiply = 
   5:     new EndpointAddress("net.tcp://localhost:5554/pubsub/mutliply");
   6: EndpointAddress addressPubSub = 
   7:     new EndpointAddress("net.tcp://localhost:5554/pubsub/register");
   8: //binding with duplex support is required! (WSDualHttpBinding, NetTcpBinding, ..)
   9: NetTcpBinding binding = new NetTcpBinding();
  10:  
  11: //create the channels to register the subscribers (link to subscriber interface)
  12: DuplexChannelFactory<ISubscriptionService> duplexAddService, duplexMultiplyService;
  13: duplexAddService = new DuplexChannelFactory<ISubscriptionService>(
  14:     new InstanceContext(new MathAddService()), binding, addressPubSub);
  15: duplexMultiplyService = new DuplexChannelFactory<ISubscriptionService>(
  16:     new InstanceContext(new MathMultiplyService()), binding, addressPubSub);
  17:  
  18: //create all host and start them
  19: ServiceHost hostAdd = new ServiceHost(typeof(MathAddService));
  20: ServiceHost hostMultiply = new ServiceHost(typeof(MathMultiplyService));
  21: ServiceHost hostSubscribe = new ServiceHost(typeof(MemoryPubSubService));
  22: hostAdd.AddServiceEndpoint(typeof(IMathAddService), binding, addressAdd.Uri);
  23: hostMultiply.AddServiceEndpoint(typeof(IMathMultiplyService)
  24:     , binding, addressMultiply.Uri);
  25: hostSubscribe.AddServiceEndpoint(typeof(ISubscriptionService)
  26:     , binding, addressPubSub.Uri);
  27: hostAdd.Open();
  28: hostMultiply.Open();
  29: hostSubscribe.Open();
  30:  
  31: //register!
  32: ISubscriptionService dupAdd = duplexAddService.CreateChannel();
  33: dupAdd.Subscribe(PublishTopics.All);
  34: duplexMultiplyService.CreateChannel().Subscribe(PublishTopics.All);
  35: //run the watcher (async processor)
  36: using (PublishMonitor monitor = new PublishMonitor())
  37: {
  38:  
  39:     ChannelFactory<IMathAddService> addFactory = 
  40:         new ChannelFactory<IMathAddService>(binding, addressAdd);
  41:     ChannelFactory<IMathMultiplyService> multiplyFactory = 
  42:         new ChannelFactory<IMathMultiplyService>(
  43:         binding, addressMultiply);
  44:     IMathAddService add = addFactory.CreateChannel();
  45:     IMathMultiplyService multiply = multiplyFactory.CreateChannel();
  46:  
  47:     Console.WriteLine("Adding 1+2=" + add.Add(1, 2));
  48:     Console.WriteLine("LastResult of add={0}, of multiply={1}", add.GetLastResult(),
  49:                       multiply.GetLastResult());
  50:     Console.WriteLine("Calling publish!");
  51:     //this call should be normally done by the service itself
  52:     dupAdd.Publish(new PublishContext()
  53:                        {
  54:                            Topic = PublishTopics.Add, SomeDetail = "Some one to reset!"
  55:                        });
  56:     dupAdd.Publish(new PublishContext()
  57:                        {
  58:                            Topic = PublishTopics.Multiply, SomeDetail = "Some want's to reset again!"
  59:                        });
  60:     //show some concurrency probs, that can occure
  61:     Console.WriteLine("LastResult of add={0}, of multiply={1}", add.GetLastResult(),
  62:                       multiply.GetLastResult());
  63:     Console.WriteLine("Adding 4*2=" + multiply.Multiply(4, 2));
  64:  
  65:     Thread.Sleep(2000);
  66:     Console.WriteLine("LastResult of add={0}, of multiply={1}", add.GetLastResult(),
  67:                       multiply.GetLastResult());
  68:     Console.WriteLine("Finished sample, Press [Enter]");
  69:     Console.ReadLine();
  70:     Console.WriteLine("closing proxies and hosts");
  71:     addFactory.Close();
  72:     multiplyFactory.Close();
  73: }
  74: //close all and cleanup
  75: duplexAddService.Close();
  76: duplexMultiplyService.Close();
  77: hostSubscribe.Close();
  78: hostAdd.Close();
  79: hostMultiply.Close();

Wird das Ganze ausgeführt, so erhalte ich zum Beispiel auf meinem Dual Core das folgende Ergebnis:

PubSub-Demo output Je nach Lust und Laune des Prozessors kann das Ergebnis der “LastResult of”-Abfragen immer mal etwas anders aussehen, das ist in dem Beispiel von mir durchaus so gewollt, um Probleme der asynchronen Verarbeitung aufzuzeigen.

Perfekt ist die Implementierung meines Beispiels nicht, allerdings soll es auch nur einige Möglichkeiten und Probleme aufzeigen.

Hier noch einige interessante Links zu dem Thema:

Keine Kommentare: