Kommunikation mit externem Service

Schauen wir uns zunächst ein HTTP-API eines Services an, den wir benutzen wollen. Dieser Service stellt eine typische Benutzerverwaltung dar und bietet uns die Möglichkeit, neue Benutzerkonten anzulegen. Wir senden also die Informationen eines neuen Benutzers per JSON an die URI /users. Der HTTP-Request könnte folglich so aussehen:

POST /users HTTP/1.1
Host: example.org
Content-Type: application/json

{"username":"torsten", "mail":"[email protected]", ...}

Der Service wiederum nimmt die Daten entgegen, lässt uns jedoch nicht auf eine Antwort warten, sondern erstellt intern einen Task, der im Hintergrund abgearbeitet wird. Wir erhalten als Antwort:

HTTP/1.1 202 Accepted
Content-Type: application/json
Location: /tasks/<task-id>

{
    "task":"<task-id>",
    "self":"/tasks/<task-id>"
}

Wir wissen nun also nicht, wann und ob unser User erfolgreich angelegt wurde, können den Zustand des Service-Tasks aber anhand des im Location-Header mitgeteilten URI beliebig abfragen. Das Polling gestaltet sich derart, dass die Antwort auf einen Request nach /tasks/<task-id> uns unter anderem den aktuellen Zustand – running oder stopped – liefert. Darüber hinaus erfahren wir ggf. auch etwas zu Fehlermeldungen und erhalten natürlich das Ergebnis des Tasks als URI.

Unsere Anfrage ist also genau genommen erst erledigt, sobald der Task des Services abgearbeitet wurde, wir können aber parallel noch weitere Anfragen stellen und so asynchron mit dem Service kommunizieren. Für dieses HTTP-API wollen wir nun ein Java-API entwickeln, welches uns die asynchrone Kommunikation mit dem Service kapselt.

Java-API mit Futures

Für unser Java-API benötigen wir eine Methode createUser(), welche die Benutzerdaten erhält und den Request an den Service anstößt. Eine Implementierung könnte so aussehen:

public User createUser(String userDataAsJsonString) {
    User user = User.fromServiceResponse(
            connection.doPostWithJson("/users", userDataAsJsonString));
    return user;
}

Wir erhalten als Antwort auf unsere Anfrage vom Service jedoch keine Repräsentation des angelegten Benutzers, sondern lediglich Informationen über einen Task. Diese schlichte Implementierung funktioniert also nicht, weil unsere Anfrage nicht bis zur Fertigstellung des neuen Benutzerkontos blockiert und diesen im Anschluss zurückgibt. Stattdessen erhalten wir vom Service die Task-ID und können den User erst nach Fertigstellung des Tasks abfragen. Wir schreiben uns also eine Task-Klasse, über welche wir den Status des Tasks und die URI des Task-Ergebnisses abfragen können.

public class APITask {
    private ConnectorMock connection;
    private int taskUid;
    private String selfUri;
    private String status;

    public APITask(ConnectorMock connection, int taskUid, String selfUri) {
        this.connection = connection;
        this.taskUid = taskUid;
        this.selfUri = selfUri
        this.status = "running";
    }

    public static APITask fromServiceResponse(ConnectorMock c, JsonNode json) {
        int uid = json.findPath("task").asInt();
        String uri = json.findPath("self").asText();
        return new APITask(c, uid, uri)
    }

    public String fetchStatus() {
        if (this.status.equals("stopped")) {
            return this.status;
        }

        JsonNode json = connection.doGet(this.selfUri);
        this.status = json.findPath("status").asText();
        return this.status;
    }

    public String getResultUri() {
        JsonNode json = connection.doGet(this.selfUri);
        return json.findPath("result").asText();
    }
}

Mit Tasks können wir dann unsere createUser()-Methode umschreiben zu:

public User createUser(String userDataAsJsonString) {
    APITask apiTask = APITask.fromServiceResponse(connection, 
                        connection.doPostWithJson("/users", userDataAsJsonString));
    while (!apiTask.fetchStatus.equals("stopped")) {
       Thread.sleep(1000);
    }
    return User.fromServiceResponse(connection.doGet(apiTask.getResultUri()));
}

Sicherlich fällt es jedem direkt auf: durch die while-Schleife und das Thread.sleep() wird nun auf der Seite unseres API blockiert. Wir wollen aber, dass auch unser eigener API-Call asynchron läuft. Daher setzen wir nun Futures ein und lassen unsere Tasks von CompletableFuture erben. In der Klasse selbst ändern wir zunächst nichts, da die get()-Methode einfach vererbt wird. Wir können die createUser()-Methode dann aber folgendermaßen schreiben:

public APITask<User> createUser(String userDataAsJsonString) {
    return APITask.fromServiceResponse(connection, 
                        connection.doPostWithJson("/users", userDataAsJsonString));
}

Der Nutzer unseres API kann dann je nach Bedarf den neu angelegten Benutzer selbst aus dem zurückgegebenen Future auslesen. Jetzt stellt sich nur noch die Frage, wann unser APITask weiß, dass der Service-Task fertig ist?

Wir erweitern unser Programm noch um eine Klasse TaskPoller. Diese fragt im Hintergrund ab, ob für einen gegebenen APITask der entsprechende Task beim Service beendet wurde.

public class TaskPoller implements Runnable {
    private APITask task;

    public TaskPoller(APITask task) {
        this.task = task;
    }

    @Override
    public void run() {
        while (!task.fetchStatus().equals("stopped")) {
            Thread.sleep(1000);
        }
        task.complete(User.fromTaskResult(task.getResultUri()));
    }
}

Der jeweilige TaskPoller kennt also “seinen” Task und kann den Zustand für diesen abfragen. Wenn fetchStatus() die Beendigung des Tasks zurückgibt, wird das Future completed und dabei der jeweilige Benutzer mit angegeben. Ansonsten warten wir einfach eine Sekunde und probieren es erneut.

Das Polling wird in der createUser()-Methode mit einem ExecutorService angestoßen. Der ExecutorService wurde dabei vom Nutzer unseres API erstellt und mit hineingereicht. Der Nutzer muss dabei die Threadpoolgröße bestimmen sowie sich später auch um das korrekte Beenden des ExecutorService kümmern.

public APITask<User> createUser(String userDataAsJsonString) {
    APITask<User> apiTask = APITask.fromServiceResponse(connection,
                        connection.doPostWithJson("/users", userDataAsJsonString));
    TaskPoller taskPoller = new TaskPoller(apiTask);
    scheduler.submit(taskPoller);
    return apiTask;
}

In TaskPoller wurde eine run-Methode implementiert, die ein ähnliches Problem beinhaltet, wie die Statusabfrage in der createUser-Methode. Durch die while-Schleife blockiert ein Thread des ExecutorServices so lange, bis der Task beim externen Service fertig ist. Damit ist der Pool an Threads potentiell sehr schnell erschöpft. Der ExecutorService bietet jedoch mit der schedule()-Methode die Möglichkeit, die Ausführung eines Callable oder Runnable erst nach einem Delay durchzuführen. Diese Möglichkeit bauen wir folgendermaßen in unseren TaskPoller ein.

public class TaskPoller implements Runnable {
    private APITask task;
    private ScheduledExecutorService ses;

    public TaskPoller(APITask task, ScheduledExecutorService ses) {
        this.task = task;
        this.ses = ses;
    }

    @Override
    public void run() {
        if (task.fetchStatus().equals("stopped")) {
            task.complete(User.fromTaskResult(task.getResultUri()));
        } else {
            ses.schedule(new TaskPoller(task, ses), 1, TimeUnit.SECONDS);
        }
    }
}

Der ExecutorService wird der Klasse mit übergeben und kann daher aus der Klasse heraus weiter genutzt werden. Dadurch kann dieser jedoch die Auslastung der Threads selber verwalten, da hier der Delay nicht dazu führt, dass ein Thread vollständig blockiert. Solange ein TaskPoller nichts tut, kann der Thread anderweitig vergeben werden.

Durch den rekursiven Aufruf des Pollings sollte innerhalb der run()-Methode noch eine Abbruchbedingung implementiert werden. Falls der Task beim Service nie beendet wird, kann ein Timeout oder ein Counter für die Anzahl an Versuchen verwendet und das CompletableFuture dann mit einem completeExceptionally() beendet werden.

Exkurs: Thread.sleep() vs. ExecutorService

Die Behauptung, dass ein Thread.sleep() einen Thread des ExecutorService blockiert, lässt sich durch folgenden einfachen Versuch überprüfen. Zunächst erstellen wir uns einen ScheduledExecutorService mit einer Threadpoolgröße von eins. Diesem übergeben wir dann zwei Runnable.

scheduler.submit(new Runnable() throws InterruptedException {
    @Override
    public void run() {
        int count = 0;
        while (count < 10) {
            Thread.sleep(2000);
            count++;
            System.out.println(Thread.currentThread().getName() + " -> " + count);
        }
    }
});
scheduler.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " -> Testausgabe");
    }
});

Anhand der Ausgaben zeigt sich, dass die Testausgabe erst erfolgt, nachdem im ersten Runnable bis zehn gezählt wurde.

Im Vergleich dazu erstellen wir eine Klasse MySchedulerTest, welche Runnable implementiert. Sie erhält ein paar Informationen über Delay etc. für die spätere Ausführung. Außerdem erfolgt die Verzögerung hier nicht per Thread.sleep(), sondern über die schedule()-Methode des ExecutorServices.

static class MySchedulerTest implements Runnable {
    private int delay;
    private int count;
    private int taskNumber;
    private ScheduledExecutorService scheduler;

    MyTest(ScheduledExecutorService scheduler, int count, int delay, int taskNumber) {
        this.scheduler = scheduler;
        this.count = count;
        this.delay = delay;
        this.taskNumber = taskNumber;
    }

    @Override
    public void run() {
        if (count < 10) {
            System.out.println(Thread.currentThread().getName()
                            + " -> Task " + taskNumber + " -> " + count);
            scheduler.schedule(
                    new MyTest(scheduler, count+1, delay, taskNumber),
                    delay,
                    TimeUnit.SECONDS);
        }
    }
}

Zwei unterschiedlich konfigurierte Instanzen dieser Klasse werden dem Scheduler übergeben.

scheduler.submit(new MyTest(scheduler, 0, 2, 1));
scheduler.submit(new MyTest(scheduler, 3, 5, 2));

Hier zeigt sich anhand der Konsolenausgaben, dass die beiden Ausführungen abwechselnd und gemischt durchgeführt werden. Der eine Thread des Pools wird also beim schedule()-Aufruf nicht blockiert, sondern vom ExecutorService anderweitig verwendet, wodurch mit einer geringen Anzahl von Threads im Pool eine weit größere Menge an Polling-Aufgaben durchgeführt werden kann.

Generalisierung für unterschiedliche Tasks

Neben dem Anlegen von Benutzerkonten kann es potentiell weitere Aufrufe an ein HTTP-API geben. Eine feste Verdrahtung von Objekten, die im Future abgefragt werden sollen, wie es im Moment in der run()-Methode geschieht, ist daher nicht sinnvoll. Java8 bietet uns hier mit den eingeführten Lambdas eine einfache Art, die Funktionalität, die bestimmt, wie das Objekt im Future auszusehen hat, in den APITask hineinzureichen. Wir erweitern den Konstruktor zu

public class APITask<T> extends CompletableFuture<T> {
    //...

    public APITask(ConnectorMock conn, JsonNode json,
                                        Function<APITask<T>, T> retriever) {
        this.conn = conn;
        this.taskUid = json.findPath("task").asInt();
        this.selfUri = json.findPath("self").asText();
        this.retriever = retriever;
        this.status = "running";
    }
}

und erstellen eine Methode getResult(), mit der wir das entsprechende Objekt dann im TaskPoller an das Future übergeben können.

public T getResult() {
    return retriever.apply(this);
}

Im TaskPoller ändern wir die run()-Methode zu:

public void run() {
    if (task.fetchStatus().equals("stopped")) {
        task.complete(task.getResult());
    } else {
        ses.schedule(new TaskPoller(task, ses), 1, TimeUnit.SECONDS);
    }
}

Der Aufruf aus unsere createUser()-Methode sieht dann letztendlich folgendermaßen aus:

public APITask<User> createUser(String userDataAsJsonString) {
    APITask<User> apiTask = new APITask<>(conn,
                conn.doPostWithJson("/users", userDataAsJsonString),
                task -> User.fromServiceResponse(conn.doGet(task.getResultUrl())) );
    TaskPoller taskPoller = new TaskPoller(apiTask, scheduler);
    scheduler.submit(taskPoller);
    return apiTask;
}

Das Lambda besteht dabei aus:

task -> User.fromServiceResponse(connection.doGet(task.getResultUri()))

welches letztlich aus der Antwort des Service den entsprechenden Benutzer erstellt.

Fazit

Java bietet aktuell mit der Kombination aus CompletableFutures, ScheduledExecutorService und Lambdas eine komfortable Möglichkeit, langläufige, asynchrone und auf Tasks aufbauende HTTP-APIs generisch abzufragen und wegzukapseln. Dazu geben wir auf Java-Seite Futures zurück, welche nicht blockieren und das Absenden mehrerer Anfragen zulassen. Die Abfrage über den Zustand des externen Tasks kapseln wir in ein Polling mit einem Runnable, um dessen Ausführung sich ein ScheduledExecutorService kümmert. Um für beliebige Tasks auch die korrekten Objekte nach dem Polling im Future zurückgeben zu können, übergeben wir die Funktionalität zur Erstellung dieser Objekte per Lambdas. Für den Benutzer eines API ist die gesamte Logik jedoch weggekapselt. Ein möglicher Aufruf des API könnte folgendmaßen aussehen und zeigt die Einfachheit für den Benutzer:

public static void main(String[] args) throws InterruptedException {
    ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);
    Factory factory = new Factory(scheduler);

    APITask<User> user1Future = factory.createUser("{\"username\":\"user1\"");
    APITask<User> user2Future = factory.createUser("{\"username\":\"user2\"");
    APITask<User> user3Future = factory.createUser("{\"username\":\"user3\"");
    APITask<User> user4Future = factory.createUser("{\"username\":\"user4\"");

    User user1 = user1Future.get();
    User user2 = user2Future.get();
    User user3 = user3Future.get();
    User user4 = user4Future.get();

    scheduler.shutdown();
}

Eine lauffähige Implementierung findet sich auf Github.