veröffentlicht am : So, 06. 05. 2018 geändert am: Mo, 30. 07. 2018
Kategorie: Computer --> Programmierung --> MongoDB-POJO Mapper morphium --> morphium
Schlagworte: java programming morphium
Einer der vielen Vorteile von Morphium ist das integrierte Messagingsystem, welches z.B. auch für die Synchronisierung der Caches benutzt wird. Das funktioniert schon so seit mit einer der erste Versionen von Morphium.
Das Messaging setzt dabei auf ein ausgeklügeltes "Locking", damit Nachrichten, die nur für einen Empfänger bestimmt sind, auch nur dort ankommen. Leider kann man so etwas im Normalfall nur durch Polling, d.h. immer wieder die selbe Anfrage an die DB senden, realisieren. Aber seit der V3.2.0 wird im Falle der Verwendung eines Replicasets der OplogMonitor genutzt, um Messaging auf eine art "Push" aufzusetzen. D.h. die DB informiert die Clients, wenn es neue Nachrichten gibt.
Das reduziert die Last und erhöht die Reaktionszeit. Schauen wir uns das im Detail mal an...
Wie einleitend schon erwähnt, muss man ab der V3.2.0 2 Fälle unterscheiden: ist Morphium mit einem ReplicaSet verbunden oder nicht.
Das gitl im übrigen auch, für einen Sharded Cluster! D.h. wenn man mit dem MongoS verbunden ist, funktioniert das Messaging via Polling. Das Polling kann konfiguriert werden, d.h. wie oft pro Minute soll denn gefragt werden. Sollen die Nachrichten einzeln bearbeitet werden, oder alle auf ein mal etc.
Im Endeffekt läuft alles auf das Locking raus. Der Algorithmus sieht in etwa so aus (und kann in Messaging.java
nachgelesen werden):
der OpLogMonitor ist ja schon eine Weile teil von Morphium. Mit dem OpLogMonitor wird ein TailableCursor
, also ein stets geöffneter Curser, ans OpLog "gehängt". Damit bekommt man eine Nachricht, sobald eine Änderung im OpLog geschieht. Was im Replicaset immer dann der Fall ist, wenn es einen schreibzugriff auf die MongoDB gibt.
Ab Morphium 4.0 wird in diesem Fall auch der ChangeStream dieser Collection genutzt. Damit ist man nicht mehr auf den Zugriff des OpLog angewiesen.
Warum dann nicht einen TailableCursor direkt an die Msg-Collection hängen? Ja, das haben wir uns auch überlegt, leider funktioniert das aus folgenden Gründen nicht wirklich:
Die Verarbeitung der Messages ist im endeffect identisch zu oben, nur kann man sich das Locking vereinfachen. Bei einer eingehenden neuen Nachricht, passiert das:
im Falle von Updates einer Nachricht muss ja eigentlich erst mal gar nicht so viel passieren. Dennoch wird in diesem Fall die Nachricht noch mal kurz gelesen und geprüft, ob sie bearbeitet werden muss.
Die Nutzung von Messaging in Morphium ist ziemlich einfach, man erstellt eine Instanz der Klasse Messaging und legt los:
im Idealfall sollte man das Messaging system z.B. über Spring oder so initialisieren.
Und dann kann man eigentlich schon loslegen. Eine Nachricht wird wie folgt gesendet:
hier wird eine Nachricht mit einer TTL (time to live) von 5 sek gespeichert. Die Default TTL ist 30sek. Ältere Nachrichten werden automatisch gelöscht (aber nicht zwingend genau nach nach 30sek...)
Nachrichten sind per default broadcast
, d.h. sie werden von allen verbundenen Clients gelesen und bearbeitet. Würde man die Nachricht auf "Exclusiv" setzen, könnten alle sie lesen, aber nur einer soll sie bearbeiten.
Diese Nachricht wird nur von einem einzigen empfänger bearbeitet!
Grundsätzlich werden Nachrichten übrigens vom Sender selbst nicht gelesen.
Und zu guter Letzt, man kann nachrichten natürlich auch direkt an einen Empfänger senden. Das passiert z.B. wenn Antworten gesendet werden. Die sollen ja nur vom Sender der ursprünglichen Nachricht bearbeitet werden.
Um eine Nachricht direkt an einen bestimmten Empfänger zu senden, muss man die ID des senders kennen. Das kann einfach durch eine eingehende Nachricht passieren oder man implementiert so eine Art discovery...
in den Integration-Tests in Morphium werden beide Methoden verwendet. Der Unterschied ist relativ einfach: storeMessage schreibt die Nachricht direkt in die Mongo wohingegen queueMessage asynchron funktioniert. D.h. die Nachricht wird erst entgegengenommen und im Hintergrund geschrieben. Evtl. der bessere Weg für Performance.
Empfang von Nachrichten ist genauso einfach. im Messaging wird einfach ein MessageListener registriert:
Dabei ist message die Message und messaging das MessagingSystem. Der listener liefert hier als Ergebnis null
zurück, könnte aber auch eine Nachricht als Antwort zurückgeben. Diese würde dann automatisch direkt an den Sender zurückgeschickt.
Mit Hilfe von messaging
kann der listener auch auf das MessagingSystem zugreifen und bei Bedarf z.B. selbst Nachrichten versenden, die keine direkte Antwort sein sollten.
Des Weiteren kann der Listener die Bearbeitung "verweigern" indem er eine MessageRejectedException
wirft. Der Sender kann auch darüber informiert werden, jedoch wird die Nachricht einfach wieder in den "Pool" zurückgeworfen und kann dann von anderen bearbeitet werden.
Das ist natürlich nur dann wirklich ein Problem, wenn es sich um eine exklusive Nachricht gehandelt hat. Denn dann sollte ein anderer Listener die Nachricht bearbeiten können.
Innerhalb von Morphium setzt der CacheSynchronizer
auf Messaging auf. Er benötigt im Constructor auch ein Messaging System.
Die Implementierun ist eigentlich recht einfach. Der CacheSynchronizer
registriert sich als MorphiumStorageListener
in Morphium und wird so über jeden Schreibzugriff informiert (denn nur dann muss ja auch der Cache synchronisiert werden).
Kommt ein Schreibzugriff rein, wird geprüft ob es sich um ein gecachtes Entity handelt und wenn ja, wird eine ClearCache
-Message über das Messaging gesendet. Diese beinhaltet, was zu tun ist abhängig von der gewählten Strategie (siehe auch Cache-Annotation in Morphium).
Außerdem bearbeitet der CacheListener
natürlich auch eingehende Nachrichten. Kommt eine Nachricht an, wird einfach der in der Message beschriebene Cache gelöscht, das element geändert oder der Cache anderweitig angepasst.
Diese Clear-Nachrichten lassen sich auch über den CacheSynchronizer direkt versenden.
und es sollte auch nicht unerwähnt bleiben, dass man natürlich auch hier einen Listener registrieren kann, um sich über die Synchronisation der Caches informieren zu lassen.
das Messaging feature von Morphium führt zu Unrecht ein Schattendasein. Es kann in vielen Fällen als einfacher Ersatz für full-blown MessagingSysteme dienen und kann super einfach eingesetzt werden. Mit der neuen Basis auf OpLogMonitor
ist ein wichtiges neues Feature hinzugekommen um Messaging noch häufiger einsetzen zu können.
erstellt Stephan Bösebeck (stephan)