Kommunikation mit externem Service
Schauen wir uns zunächst ein HTTP-API eines Services an, den wir benutzen wollen.
Dieser Service stellt eine typische Benutzerverwaltung dar und bietet uns die
Möglichkeit, neue Benutzerkonten anzulegen. Wir senden also die Informationen eines
neuen Benutzers per JSON an die URI /users
. Der HTTP-Request könnte folglich so
aussehen:
Der Service wiederum nimmt die Daten entgegen, lässt uns jedoch nicht auf eine Antwort warten, sondern erstellt intern einen Task, der im Hintergrund abgearbeitet wird. Wir erhalten als Antwort:
Wir wissen nun also nicht, wann und ob unser User erfolgreich angelegt wurde, können
den Zustand des Service-Tasks aber anhand des im Location-Header mitgeteilten URI
beliebig abfragen. Das Polling gestaltet sich derart, dass die Antwort auf einen
Request nach /tasks/<task-id>
uns unter anderem den aktuellen Zustand – running
oder stopped – liefert. Darüber hinaus erfahren wir ggf. auch etwas zu
Fehlermeldungen und erhalten natürlich das Ergebnis des Tasks als URI.
Unsere Anfrage ist also genau genommen erst erledigt, sobald der Task des Services abgearbeitet wurde, wir können aber parallel noch weitere Anfragen stellen und so asynchron mit dem Service kommunizieren. Für dieses HTTP-API wollen wir nun ein Java-API entwickeln, welches uns die asynchrone Kommunikation mit dem Service kapselt.
Java-API mit Futures
Für unser Java-API benötigen wir eine Methode createUser()
, welche die
Benutzerdaten erhält und den Request an den Service anstößt. Eine Implementierung
könnte so aussehen:
Wir erhalten als Antwort auf unsere Anfrage vom Service jedoch keine Repräsentation des angelegten Benutzers, sondern lediglich Informationen über einen Task. Diese schlichte Implementierung funktioniert also nicht, weil unsere Anfrage nicht bis zur Fertigstellung des neuen Benutzerkontos blockiert und diesen im Anschluss zurückgibt. Stattdessen erhalten wir vom Service die Task-ID und können den User erst nach Fertigstellung des Tasks abfragen. Wir schreiben uns also eine Task-Klasse, über welche wir den Status des Tasks und die URI des Task-Ergebnisses abfragen können.
Mit Tasks können wir dann unsere createUser()
-Methode umschreiben zu:
Sicherlich fällt es jedem direkt auf: durch die while-Schleife und das Thread.sleep()
wird nun auf der Seite unseres API blockiert. Wir wollen aber, dass auch unser
eigener API-Call asynchron läuft. Daher setzen wir nun Futures ein und lassen unsere
Tasks von CompletableFuture
erben. In der Klasse selbst ändern wir zunächst
nichts, da die get()
-Methode einfach vererbt wird. Wir können die
createUser()
-Methode dann aber folgendermaßen schreiben:
Der Nutzer unseres API kann dann je nach Bedarf den neu angelegten Benutzer selbst aus dem zurückgegebenen Future auslesen. Jetzt stellt sich nur noch die Frage, wann unser APITask weiß, dass der Service-Task fertig ist?
Wir erweitern unser Programm noch um eine Klasse TaskPoller
. Diese fragt im
Hintergrund ab, ob für einen gegebenen APITask der entsprechende Task beim Service
beendet wurde.
Der jeweilige TaskPoller
kennt also „seinen“ Task und kann den Zustand für diesen
abfragen. Wenn fetchStatus()
die Beendigung des Tasks zurückgibt, wird das Future
completed und dabei der jeweilige Benutzer mit angegeben. Ansonsten warten wir
einfach eine Sekunde und probieren es erneut.
Das Polling wird in der createUser()
-Methode mit einem
ExecutorService
angestoßen. Der ExecutorService wurde dabei vom Nutzer unseres API erstellt und mit
hineingereicht. Der Nutzer muss dabei die Threadpoolgröße bestimmen sowie sich später
auch um das korrekte Beenden des ExecutorService kümmern.
In TaskPoller
wurde eine run
-Methode implementiert, die ein ähnliches
Problem beinhaltet, wie die Statusabfrage in der createUser
-Methode. Durch die
while-Schleife blockiert ein Thread des ExecutorServices so lange, bis der Task beim
externen Service fertig ist. Damit ist der Pool an Threads potentiell sehr schnell
erschöpft. Der ExecutorService bietet jedoch mit der schedule()
-Methode die
Möglichkeit, die Ausführung eines Callable oder Runnable erst nach einem Delay
durchzuführen. Diese Möglichkeit bauen wir folgendermaßen in unseren TaskPoller
ein.
Der ExecutorService wird der Klasse mit übergeben und kann daher aus der Klasse heraus weiter genutzt werden. Dadurch kann dieser jedoch die Auslastung der Threads selber verwalten, da hier der Delay nicht dazu führt, dass ein Thread vollständig blockiert. Solange ein TaskPoller nichts tut, kann der Thread anderweitig vergeben werden.
Durch den rekursiven Aufruf des Pollings sollte innerhalb der run()
-Methode noch
eine Abbruchbedingung implementiert werden. Falls der Task beim Service nie
beendet wird, kann ein Timeout oder ein Counter für die Anzahl an Versuchen
verwendet und das CompletableFuture dann mit einem
completeExceptionally()
beendet werden.
Exkurs: Thread.sleep() vs. ExecutorService
Die Behauptung, dass ein Thread.sleep()
einen Thread des ExecutorService
blockiert, lässt sich durch folgenden einfachen Versuch überprüfen. Zunächst
erstellen wir uns einen ScheduledExecutorService
mit einer Threadpoolgröße von
eins. Diesem übergeben wir dann zwei Runnable
.
Anhand der Ausgaben zeigt sich, dass die Testausgabe
erst erfolgt, nachdem im
ersten Runnable
bis zehn gezählt wurde.
Im Vergleich dazu erstellen wir eine Klasse MySchedulerTest
, welche Runnable
implementiert. Sie erhält ein paar Informationen über Delay etc. für die spätere
Ausführung. Außerdem erfolgt die Verzögerung hier nicht per Thread.sleep()
,
sondern über die schedule()
-Methode des ExecutorServices.
Zwei unterschiedlich konfigurierte Instanzen dieser Klasse werden dem Scheduler übergeben.
Hier zeigt sich anhand der Konsolenausgaben, dass die beiden Ausführungen
abwechselnd und gemischt durchgeführt werden. Der eine Thread des Pools wird also
beim schedule()
-Aufruf nicht blockiert, sondern vom ExecutorService
anderweitig verwendet, wodurch mit einer geringen Anzahl von Threads im Pool eine
weit größere Menge an Polling-Aufgaben durchgeführt werden kann.
Generalisierung für unterschiedliche Tasks
Neben dem Anlegen von Benutzerkonten kann es potentiell weitere Aufrufe an
ein HTTP-API geben. Eine feste Verdrahtung von Objekten, die im Future
abgefragt werden sollen, wie es im Moment in der run()
-Methode geschieht, ist
daher nicht sinnvoll. Java8 bietet uns hier mit den eingeführten Lambdas eine
einfache Art, die Funktionalität, die bestimmt, wie das Objekt im Future auszusehen
hat, in den APITask
hineinzureichen. Wir erweitern den Konstruktor zu
und erstellen eine Methode getResult()
, mit der wir das entsprechende Objekt dann
im TaskPoller
an das Future übergeben können.
Im TaskPoller
ändern wir die run()
-Methode zu:
Der Aufruf aus unsere createUser()
-Methode sieht dann letztendlich folgendermaßen
aus:
Das Lambda besteht dabei aus:
task -> User.fromServiceResponse(connection.doGet(task.getResultUri()))
welches letztlich aus der Antwort des Service den entsprechenden Benutzer erstellt.
Fazit
Java bietet aktuell mit der Kombination aus CompletableFutures, ScheduledExecutorService und Lambdas eine komfortable Möglichkeit, langläufige, asynchrone und auf Tasks aufbauende HTTP-APIs generisch abzufragen und wegzukapseln. Dazu geben wir auf Java-Seite Futures zurück, welche nicht blockieren und das Absenden mehrerer Anfragen zulassen. Die Abfrage über den Zustand des externen Tasks kapseln wir in ein Polling mit einem Runnable, um dessen Ausführung sich ein ScheduledExecutorService kümmert. Um für beliebige Tasks auch die korrekten Objekte nach dem Polling im Future zurückgeben zu können, übergeben wir die Funktionalität zur Erstellung dieser Objekte per Lambdas. Für den Benutzer eines API ist die gesamte Logik jedoch weggekapselt. Ein möglicher Aufruf des API könnte folgendmaßen aussehen und zeigt die Einfachheit für den Benutzer:
Eine lauffähige Implementierung findet sich auf Github.