Projekt reactor

Das Spring Framework nutzt für den Starter spring-boot-starter-webflux das Projekt reaktor. Dieses verleiht dem Spring Framework reaktive Streams. Das Framework Reactor ist von Hause aus Multithreading fähig, so dass es dem Entwickler hier hilft.

Warum reaktiv?

Meine Anwendung ist doch asynchron und verwendet Threads! Warum soll sie jetzt auf einmal reaktiv sein? Was ist der Unterschied zwischen asynchron und reaktiv? Diese Fragen stellte ich mir auch. Nun ja, einer der Unterschiede ist, dass das Framework ein Backpreasure kennt. Dieser „Gegendruck“ verhindert zum Beispiel das ein Subscriber mit den vom Publisher gelieferten Daten überfordert wird.

Mono und Flux

Mono hat genau 0 … 1 Elemente und Flux hat 0 … N Elemente.

Hot und Cold Streams

Bei Hot-Streams ist die Anzahl der Elemente unbekannt und fortwährend. Man stelle sich zum Beispiel eine Datenquelle wie einen Temperatursensor vor. Solange die Anwendung läuft, können wir Messwerte abfragen und das unendlich.

Nimmt man als Quelle für einen reaktiven Stream zum Beispiel eine Liste, dann ist die Anzahl der Elemente bekannt und endlich. In der Sprache von reaktor heißt das Cold-Stream.

Es gibt noch einen weiteren unterschied zwischen Hot- und Cold-Streams: Sind Elemente in einem Hot-Stream einmal konsumiert durch einen Subscriber, dann sind Sie weg. Schreibt man sich bei einem Cold-Stream ein zweites mal ein, dann werden erneut alle Elemente publiziert.

Dependencies

implementation('io.projectreactor:reactor-core')


dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:Californium-RELEASE"
     }
}

Wie mache ich aus einem blockierenden Methodenaufruf ein reaktiven Stream?

Mit der Methode .fromCallable() von Mono kann man einen blockierende Aufruf als Lambda Funktion übergeben und erhält unmittelbar ein Mono Objekt zurück. Als kurzes Beispiel wird hier eine blockierende Methode getInfo() gewrappt.

/**
    * Wrap blocking service call getInfo
    * 
    * @param Long itemId
    * @return Mono<Info>
    */
private Mono<Info> getInfo(Long id) {
        Mono<Info> blockingWrapper = Mono.fromCallable(() -> { 
                return service.getInfo(id);
        });

        return blockingWrapper.subscribeOn(Schedulers.elastic());
}

Das ganze kann man jetzt in einen Stream aus id verpacken.

private Flux<Info> getInfos(Flux<Long> ids) {
        return ids.flatMap(id -> getInfo(id), 2); // max 2 concurrent
}

Begrenzung der Aufrufe

In dem letzten Abschnitt haben wir gesehen wie man einen blockierenden Aufruf wrappen kann. In reaktiven Streams ist es so, dass keine Daten produziert werden solange kein Subscriber vorhanden ist. Wenn jetzt ein Subscriber für den Flux von getInfos() aboniert, dann werden alle id parallel im Excecutor ThreadPool abgfragt. Dieses kann zum Beispiel bei teuren Funktionen wie zum Beispiel Rest Calls zu einer Überlastung führen. Daher muss hier ein weiterer Parameter in flatMap mit angegeben werden. Damit kann man die Anzahl der Concurrent Calls begrenzen. Hier in diesem Beispiel sind immer maximal 2 konkurierende Aufruf gleichzeitig.