Kommunikation mit externem Service
Schauen wir uns zunächst ein HTTP-API eines Services an, den wir benutzen wollen.
Dieser Service stellt eine typische Benutzerverwaltung dar und bietet uns die
Möglichkeit, neue Benutzerkonten anzulegen. Wir senden also die Informationen eines
neuen Benutzers per JSON an die URI /users
. Der HTTP-Request könnte folglich so
aussehen:
POST /users HTTP/1.1
Host: example.org
Content-Type: application/json
{"username":"torsten", "mail":"[email protected]", ...}
Der Service wiederum nimmt die Daten entgegen, lässt uns jedoch nicht auf eine Antwort warten, sondern erstellt intern einen Task, der im Hintergrund abgearbeitet wird. Wir erhalten als Antwort:
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: /tasks/<task-id>
{
"task":"<task-id>",
"self":"/tasks/<task-id>"
}
Wir wissen nun also nicht, wann und ob unser User erfolgreich angelegt wurde, können
den Zustand des Service-Tasks aber anhand des im Location-Header mitgeteilten URI
beliebig abfragen. Das Polling gestaltet sich derart, dass die Antwort auf einen
Request nach /tasks/<task-id>
uns unter anderem den aktuellen Zustand – running
oder stopped – liefert. Darüber hinaus erfahren wir ggf. auch etwas zu
Fehlermeldungen und erhalten natürlich das Ergebnis des Tasks als URI.
Unsere Anfrage ist also genau genommen erst erledigt, sobald der Task des Services abgearbeitet wurde, wir können aber parallel noch weitere Anfragen stellen und so asynchron mit dem Service kommunizieren. Für dieses HTTP-API wollen wir nun ein Java-API entwickeln, welches uns die asynchrone Kommunikation mit dem Service kapselt.
Java-API mit Futures
Für unser Java-API benötigen wir eine Methode createUser()
, welche die
Benutzerdaten erhält und den Request an den Service anstößt. Eine Implementierung
könnte so aussehen:
public User createUser(String userDataAsJsonString) {
User user = User.fromServiceResponse(
connection.doPostWithJson("/users", userDataAsJsonString));
return user;
}
Wir erhalten als Antwort auf unsere Anfrage vom Service jedoch keine Repräsentation des angelegten Benutzers, sondern lediglich Informationen über einen Task. Diese schlichte Implementierung funktioniert also nicht, weil unsere Anfrage nicht bis zur Fertigstellung des neuen Benutzerkontos blockiert und diesen im Anschluss zurückgibt. Stattdessen erhalten wir vom Service die Task-ID und können den User erst nach Fertigstellung des Tasks abfragen. Wir schreiben uns also eine Task-Klasse, über welche wir den Status des Tasks und die URI des Task-Ergebnisses abfragen können.
public class APITask {
private ConnectorMock connection;
private int taskUid;
private String selfUri;
private String status;
public APITask(ConnectorMock connection, int taskUid, String selfUri) {
this.connection = connection;
this.taskUid = taskUid;
this.selfUri = selfUri
this.status = "running";
}
public static APITask fromServiceResponse(ConnectorMock c, JsonNode json) {
int uid = json.findPath("task").asInt();
String uri = json.findPath("self").asText();
return new APITask(c, uid, uri)
}
public String fetchStatus() {
if (this.status.equals("stopped")) {
return this.status;
}
JsonNode json = connection.doGet(this.selfUri);
this.status = json.findPath("status").asText();
return this.status;
}
public String getResultUri() {
JsonNode json = connection.doGet(this.selfUri);
return json.findPath("result").asText();
}
}
Mit Tasks können wir dann unsere createUser()
-Methode umschreiben zu:
public User createUser(String userDataAsJsonString) {
APITask apiTask = APITask.fromServiceResponse(connection,
connection.doPostWithJson("/users", userDataAsJsonString));
while (!apiTask.fetchStatus.equals("stopped")) {
Thread.sleep(1000);
}
return User.fromServiceResponse(connection.doGet(apiTask.getResultUri()));
}
Sicherlich fällt es jedem direkt auf: durch die while-Schleife und das Thread.sleep()
wird nun auf der Seite unseres API blockiert. Wir wollen aber, dass auch unser
eigener API-Call asynchron läuft. Daher setzen wir nun Futures ein und lassen unsere
Tasks von CompletableFuture
erben. In der Klasse selbst ändern wir zunächst
nichts, da die get()
-Methode einfach vererbt wird. Wir können die
createUser()
-Methode dann aber folgendermaßen schreiben:
public APITask<User> createUser(String userDataAsJsonString) {
return APITask.fromServiceResponse(connection,
connection.doPostWithJson("/users", userDataAsJsonString));
}
Der Nutzer unseres API kann dann je nach Bedarf den neu angelegten Benutzer selbst aus dem zurückgegebenen Future auslesen. Jetzt stellt sich nur noch die Frage, wann unser APITask weiß, dass der Service-Task fertig ist?
Wir erweitern unser Programm noch um eine Klasse TaskPoller
. Diese fragt im
Hintergrund ab, ob für einen gegebenen APITask der entsprechende Task beim Service
beendet wurde.
public class TaskPoller implements Runnable {
private APITask task;
public TaskPoller(APITask task) {
this.task = task;
}
@Override
public void run() {
while (!task.fetchStatus().equals("stopped")) {
Thread.sleep(1000);
}
task.complete(User.fromTaskResult(task.getResultUri()));
}
}
Der jeweilige TaskPoller
kennt also „seinen” Task und kann den Zustand für diesen
abfragen. Wenn fetchStatus()
die Beendigung des Tasks zurückgibt, wird das Future
completed und dabei der jeweilige Benutzer mit angegeben. Ansonsten warten wir
einfach eine Sekunde und probieren es erneut.
Das Polling wird in der createUser()
-Methode mit einem
ExecutorService
angestoßen. Der ExecutorService wurde dabei vom Nutzer unseres API erstellt und mit
hineingereicht. Der Nutzer muss dabei die Threadpoolgröße bestimmen sowie sich später
auch um das korrekte Beenden des ExecutorService kümmern.
public APITask<User> createUser(String userDataAsJsonString) {
APITask<User> apiTask = APITask.fromServiceResponse(connection,
connection.doPostWithJson("/users", userDataAsJsonString));
TaskPoller taskPoller = new TaskPoller(apiTask);
scheduler.submit(taskPoller);
return apiTask;
}
In TaskPoller
wurde eine run
-Methode implementiert, die ein ähnliches
Problem beinhaltet, wie die Statusabfrage in der createUser
-Methode. Durch die
while-Schleife blockiert ein Thread des ExecutorServices so lange, bis der Task beim
externen Service fertig ist. Damit ist der Pool an Threads potentiell sehr schnell
erschöpft. Der ExecutorService bietet jedoch mit der schedule()
-Methode die
Möglichkeit, die Ausführung eines Callable oder Runnable erst nach einem Delay
durchzuführen. Diese Möglichkeit bauen wir folgendermaßen in unseren TaskPoller
ein.
public class TaskPoller implements Runnable {
private APITask task;
private ScheduledExecutorService ses;
public TaskPoller(APITask task, ScheduledExecutorService ses) {
this.task = task;
this.ses = ses;
}
@Override
public void run() {
if (task.fetchStatus().equals("stopped")) {
task.complete(User.fromTaskResult(task.getResultUri()));
} else {
ses.schedule(new TaskPoller(task, ses), 1, TimeUnit.SECONDS);
}
}
}
Der ExecutorService wird der Klasse mit übergeben und kann daher aus der Klasse heraus weiter genutzt werden. Dadurch kann dieser jedoch die Auslastung der Threads selber verwalten, da hier der Delay nicht dazu führt, dass ein Thread vollständig blockiert. Solange ein TaskPoller nichts tut, kann der Thread anderweitig vergeben werden.
Durch den rekursiven Aufruf des Pollings sollte innerhalb der run()
-Methode noch
eine Abbruchbedingung implementiert werden. Falls der Task beim Service nie
beendet wird, kann ein Timeout oder ein Counter für die Anzahl an Versuchen
verwendet und das CompletableFuture dann mit einem
completeExceptionally()
beendet werden.
Exkurs: Thread.sleep() vs. ExecutorService
Die Behauptung, dass ein Thread.sleep()
einen Thread des ExecutorService
blockiert, lässt sich durch folgenden einfachen Versuch überprüfen. Zunächst
erstellen wir uns einen ScheduledExecutorService
mit einer Threadpoolgröße von
eins. Diesem übergeben wir dann zwei Runnable
.
scheduler.submit(new Runnable() throws InterruptedException {
@Override
public void run() {
int count = 0;
while (count < 10) {
Thread.sleep(2000);
count++;
System.out.println(Thread.currentThread().getName() + " -> " + count);
}
}
});
scheduler.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " -> Testausgabe");
}
});
Anhand der Ausgaben zeigt sich, dass die Testausgabe
erst erfolgt, nachdem im
ersten Runnable
bis zehn gezählt wurde.
Im Vergleich dazu erstellen wir eine Klasse MySchedulerTest
, welche Runnable
implementiert. Sie erhält ein paar Informationen über Delay etc. für die spätere
Ausführung. Außerdem erfolgt die Verzögerung hier nicht per Thread.sleep()
,
sondern über die schedule()
-Methode des ExecutorServices.
static class MySchedulerTest implements Runnable {
private int delay;
private int count;
private int taskNumber;
private ScheduledExecutorService scheduler;
MyTest(ScheduledExecutorService scheduler, int count, int delay, int taskNumber) {
this.scheduler = scheduler;
this.count = count;
this.delay = delay;
this.taskNumber = taskNumber;
}
@Override
public void run() {
if (count < 10) {
System.out.println(Thread.currentThread().getName()
+ " -> Task " + taskNumber + " -> " + count);
scheduler.schedule(
new MyTest(scheduler, count+1, delay, taskNumber),
delay,
TimeUnit.SECONDS);
}
}
}
Zwei unterschiedlich konfigurierte Instanzen dieser Klasse werden dem Scheduler übergeben.
scheduler.submit(new MyTest(scheduler, 0, 2, 1));
scheduler.submit(new MyTest(scheduler, 3, 5, 2));
Hier zeigt sich anhand der Konsolenausgaben, dass die beiden Ausführungen
abwechselnd und gemischt durchgeführt werden. Der eine Thread des Pools wird also
beim schedule()
-Aufruf nicht blockiert, sondern vom ExecutorService
anderweitig verwendet, wodurch mit einer geringen Anzahl von Threads im Pool eine
weit größere Menge an Polling-Aufgaben durchgeführt werden kann.
Generalisierung für unterschiedliche Tasks
Neben dem Anlegen von Benutzerkonten kann es potentiell weitere Aufrufe an
ein HTTP-API geben. Eine feste Verdrahtung von Objekten, die im Future
abgefragt werden sollen, wie es im Moment in der run()
-Methode geschieht, ist
daher nicht sinnvoll. Java8 bietet uns hier mit den eingeführten Lambdas eine
einfache Art, die Funktionalität, die bestimmt, wie das Objekt im Future auszusehen
hat, in den APITask
hineinzureichen. Wir erweitern den Konstruktor zu
public class APITask<T> extends CompletableFuture<T> {
//...
public APITask(ConnectorMock conn, JsonNode json,
Function<APITask<T>, T> retriever) {
this.conn = conn;
this.taskUid = json.findPath("task").asInt();
this.selfUri = json.findPath("self").asText();
this.retriever = retriever;
this.status = "running";
}
}
und erstellen eine Methode getResult()
, mit der wir das entsprechende Objekt dann
im TaskPoller
an das Future übergeben können.
public T getResult() {
return retriever.apply(this);
}
Im TaskPoller
ändern wir die run()
-Methode zu:
public void run() {
if (task.fetchStatus().equals("stopped")) {
task.complete(task.getResult());
} else {
ses.schedule(new TaskPoller(task, ses), 1, TimeUnit.SECONDS);
}
}
Der Aufruf aus unsere createUser()
-Methode sieht dann letztendlich folgendermaßen
aus:
public APITask<User> createUser(String userDataAsJsonString) {
APITask<User> apiTask = new APITask<>(conn,
conn.doPostWithJson("/users", userDataAsJsonString),
task -> User.fromServiceResponse(conn.doGet(task.getResultUrl())) );
TaskPoller taskPoller = new TaskPoller(apiTask, scheduler);
scheduler.submit(taskPoller);
return apiTask;
}
Das Lambda besteht dabei aus:
task -> User.fromServiceResponse(connection.doGet(task.getResultUri()))
welches letztlich aus der Antwort des Service den entsprechenden Benutzer erstellt.
Fazit
Java bietet aktuell mit der Kombination aus CompletableFutures, ScheduledExecutorService und Lambdas eine komfortable Möglichkeit, langläufige, asynchrone und auf Tasks aufbauende HTTP-APIs generisch abzufragen und wegzukapseln. Dazu geben wir auf Java-Seite Futures zurück, welche nicht blockieren und das Absenden mehrerer Anfragen zulassen. Die Abfrage über den Zustand des externen Tasks kapseln wir in ein Polling mit einem Runnable, um dessen Ausführung sich ein ScheduledExecutorService kümmert. Um für beliebige Tasks auch die korrekten Objekte nach dem Polling im Future zurückgeben zu können, übergeben wir die Funktionalität zur Erstellung dieser Objekte per Lambdas. Für den Benutzer eines API ist die gesamte Logik jedoch weggekapselt. Ein möglicher Aufruf des API könnte folgendmaßen aussehen und zeigt die Einfachheit für den Benutzer:
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);
Factory factory = new Factory(scheduler);
APITask<User> user1Future = factory.createUser("{\"username\":\"user1\"");
APITask<User> user2Future = factory.createUser("{\"username\":\"user2\"");
APITask<User> user3Future = factory.createUser("{\"username\":\"user3\"");
APITask<User> user4Future = factory.createUser("{\"username\":\"user4\"");
User user1 = user1Future.get();
User user2 = user2Future.get();
User user3 = user3Future.get();
User user4 = user4Future.get();
scheduler.shutdown();
}
Eine lauffähige Implementierung findet sich auf Github.