26 août 2016

Reactor - N’ayez plus peur de la programmation non bloquante avec Reactor !

Design & Code

Antoine

Michaud

Image programmation.

26 août 2016

Reactor - N’ayez plus peur de la programmation non bloquante avec Reactor !

Design & Code

Antoine

Michaud

Image programmation.

26 août 2016

Reactor - N’ayez plus peur de la programmation non bloquante avec Reactor !

Design & Code

Antoine

Michaud

Image programmation.

Introduction à Reactor

Parlons de Reactor : avec l'amélioration continue des techniques et des besoins, la programmation réactive est devenue la référence pour les programmes demandant disponibilité, résilience, souplesse et répondant à des événements asynchrones. Elle est d’autant plus une référence lorsque l’on souhaite répartir la charge et utiliser les ressources tout en permettant de mieux découpler le code d’une application. L’écosystème Java, toujours soucieux de permettre à ses développeurs de créer des applications performantes, de devait de suivre ce mouvement, et ça n’a pas manqué ! Après la publication de la première version du Manifeste Réactif en 2013 par Jonas Bonér, plusieurs librairies sont apparues permettant ainsi de profiter de ce nouveau modèle de programmation, dont certaines sont particulièrement devenues populaires :

  • RxJava (Reactive eXtension for Java): Principalement connu dans l’écosystème Android,elle se distingue par sa compatibilité officielle avec les différentes versions de ce SDK et utilise le design pattern Observable

  • Project Reactor: Venant de la communauté de Spring, elle en est le cœur de WebFlux (qui permet de développement d’APIs réactives et non-bloquantes), implémente la spécification Reactive Streams et profite de l’avantage d’être basée sur les APIs Java (CompletableFuture, Stream, Duration, ExecutorService, Flow...) et utilise aussi le design pattern Observable

  • Eclipse Vert.X: Utilisée par beaucoup de grandes enseignes, et à l’époque faisant partie des plus performantes, cette librairie se distingue par sa modularité et sa simplicité, et intègre un bus d’évènements permettant de centraliser et multiplexer les messages

Néanmoins, ces différentes librairies évoluent, et la différence de performance et de moins en moins présente, voire quasi inexistante. De plus, chacune d’entre elles s’importe très facilement par un simple ajout d’une dépendance (voire plus suivant les modules nécessaires, comme un client http, kafka ou encore UDP).

Malgré ces différentes librairies, nous allons nous concentrer sur le projet Reactor qui m’a permis d’aborder très sereinement ces nouvelles contraintes lorsque j’y ai été confronté chez ManoMano.

Reactor, kesako ?

La programmation réactive a pour base de concept chez Reactor un flux d'éléments, fini ou infini, représenté par deux classes principales, plutôt parlantes :

  • Mono : flux d'éléments pouvant en émettre un seul au maximum, ayant donc une cardinalité de 0 à 1

  • Flux : flux d'éléments pouvant en émettre à l'infini, ayant une cardinalité de 0 à N

Chacun de ces deux types de flux représente un Publisher et Reactor apporte tout un tas de méthodes nommées “opérateurs”, permettant la manipulation des éléments émis par ces flux afin de les regrouper, les diviser, les différer, les filtrer, les convertir, et bien plus encore… Il vous permet aussi de gérer les erreurs (je sais, personne n’est parfait) qui peuvent survenir pendant le traitement de votre flux en vous donnant la possibilité de les ignorer, de les convertir en un autre élément, etc.  Ces outils permettent donc une manipulation simple et très lisible (syntaxe encourageant la programmation fonctionnelle) des flux « en mode asynchrone » et Reactor, de par sa structure, vous fait gagner en disponibilité, résilience et souplesse de manière transparente !

Plan de l'article Reactor

Dans un premier temps, je vais vous exposer le contexte dans lequel Reactor sera mis en œuvre avec le code final (histoire d'attiser votre curiosité) et ensuite je vais le décrire pas-à-pas. Au travers de cette description, je souhaite vous apporter une meilleure compréhension des problématiques soulevées par la programmation réactive et vous montrer comment Reactor vous permet de les maîtriser facilement pour que vous n’ayez plus aucune excuse pour vous y mettre ! Et si cette librairie vous intrigue, je vous indiquerai quelques concepts à creuser pour d'aller plus loin !

C'est parti !

Le cas d’usage Reactor

Prenons le côté positif d'un sujet d'actualité : le nombre de guéris du Covid-19.

Le but ici va être d'appeler une API qui va récupérer le nombre de guéris d’hier et d’avant-hier en parallèle pour en afficher la différence le plus rapidement possible, tout en faisant attention à une possible erreur de format de données ou de réseau. Pour se faire, nous devons l'appeler deux fois : une pour chaque date (hier et avant-hier).

L'API en question est coronavirusapi-france.now.sh qui expose plusieurs routes permettant d'obtenir les statistiques du covid-19 de manière globale et mondiale, par pays, par département... Et par date, ce qui nous intéresse donc ici. En se basant sur le 15 novembre 2020, la route à appeler serait : GET coronavirusapi-france.now.sh/FranceGlobalDataByDate?date=2020-11-15, et le résultat en JSON serait par exemple :

{
  "FranceGlobalDataByDate" : [
    {
      "date" : "2020-11-15",
      "sourceType" : "ministere-sante",
      "gueris" : 1111
      [...]
    },
    {
      "date" : "2020-11-15",
      "gueris" : 2222,
      "sourceType" : "opencovid19-fr"
      [...]

Le JSON obtenu nous permet de savoir le nombre absolu de guéris en fonction des sources, comme les données open source de opencovid19-fr, ou le Ministère de la santé. Ce sera cette dernière source que nous allons traiter, en ignorant les autres.

Assez parlé du contexte, voici le code final permettant de faire notre calcul :

@SpringBootApplication
public class LesGuerisDuCovid19 implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(LesGuerisDuCovid19.class);
   private static final DateTimeFormatter DATE_PARAM_FORMATTER = DateTimeFormatter.ISO_DATE;
    private static final String SOURCE_INTERESSANTE = "ministere-sante";
    private final WebClient httpClient = WebClient.create("https://coronavirusapi-france.now.sh");
    public static void main(final String[] args) {
         var app = new SpringApplication(LesGuerisDuCovid19.class);
        app.setWebApplicationType(WebApplicationType.NONE);// Pour ne pas lancer le serveur web
        app.run(args);
    }
    @Override
    public void run(final String... args) {
        var hier = LocalDate.now().minusDays(1);
        var avantHier = hier.minusDays(1);
        Integer nouveauxGueris = Mono.zip(getNombreGueris(hier, SOURCE_INTERESSANTE), getNombreGueris(avantHier, SOURCE_INTERESSANTE)) // 8
            .doFirst(() -> log.info("Récupération des données entre hier et avant-hier...")) // 9
            .map(tuple -> tuple.getT1() - tuple.getT2()) // 10
            .onErrorResume(error -> Mono.fromRunnable(() -> log.error("Problème lors de la récupération des données: {}", error.getMessage()))) // 11
            .block(); // 12
        // .... À vous d'écrire la suite ....
    }
    private Mono<Integer>

En résumé, getNombreGueris(...) va récupérer la valeur du nombre de guéris dans le résultat (en JSON) de la réponse de l'API, et émettre une erreur si jamais ce champ n'est pas trouvé. La méthode run() est le point d'entrée de notre programme, qui va appeler getNombreGueris pour la date d'hier et d'avant-hier en parallèle, afin de pouvoir calculer la différence de guéris et l'afficher dans la console.

Et en avant pour l’explication. Dans un premier temps, je vais vous expliquer comment initialiser votre projet en ajoutant les dépendances nécessaires pour utiliser Reactor et Spring WebFlux. Ensuite nous verrons comment récupérer le nombre de guéris pour une date donnée. Pour finir, je vous expliquerai comment combiner le résultat des deux appels.

Dépendance

La première chose à faire est d'ajouter les dépendances nécessaires pour utiliser Reactor, ainsi que WebClient, une fonctionnalité de Spring nous permettant d'appeler une API de manière asynchrone, basé sur... Reactor ! Quelle coïncidence ;)

Il vous faudra donc n’ajouter qu'une seule dépendance à voitre projet :

  • Avec gradle

  • Avec maven

  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
  <version>2.4.0</version>
</dependency>

Explications pas-à-pas

Nous allons partir d'une base très simple, en laissant Spring boot configurer le nécessaire.

Ici, l'auto-configuration va surtout nous servir à avoir un format de logs sympa, avec le niveau par défaut à INFO.


1.

Pour faire notre requête http vers l'API, il va falloir initialiser un WebClient en donnant l'url de base, ici https://coronavirusapi-france.now.sh :

WebClient nous propose une abstraction de tous les verbes http habituels, à savoir GET, POST, PUT, DELETE, etc... Ici, nous allons utiliser GET pour le chemin /FranceGlobalDataByDate qui attend le paramètre de requête date formaté en date ISO (aaaa-MM-jj), et récupérer le JSON sorti tout droit de l'API :

Mono<JsonNode>

Si nous voulons passer plus de paramètre de requête, il suffit de chaîner les appels avec .queryParam("param", value)

JsonNode.class pourrait être remplacé par FranceGlobalData.class où FranceGlobalData serait un simple POJO, représentant le schéma du JSON à extraire, plutôt que d'utiliser JsonNode qui est générique et pourra représenter n'importe quel schéma JSON. Si la réponse est totalement vide (pas de JSON retournée), le Mono retourné sera lui aussi vide, mais chut ! Un peu de suspens avant d'en parler.

2.a.

Il est possible d'effectuer une action lorsqu'un flux se déclenche (autrement lors d'un subscribe), avant tout le reste.

Ici, on veut écrire un log juste avant d'exécuter la requête http :

2.b.

Mais on peut aussi effectuer une action lorsqu'un élément est émis dans un flux.

Ici, on veut écrire un log juste après avoir exécuté la requête http :

3.

Nous avons dans le Mono l'objet JSON suivant :

 "FranceGlobalDataByDate" : [ ... ]

Nous allons mapper la donnée reçue pour récupérer le contenu du champ FranceGlobalDataByDate :

4.

Nous avons maintenant dans le Mono l'élément JSON suivant (un tableau d'objets) :

[
  {
  "date" : "aaaa-MM-jj",
  "sourceType" : "xxx",
    "gueris" : 1111
    [...]

Nous allons convertir le Mono émettant le tableau d'objets vers un Flux d'objets, rendant bien plus simple les futures opérations pour chaque objet.

La particularité du JsonNode est qu'il ne fournit pas directement une liste ou un stream de tous les objets.

Pour avoir accès à chacun d’eux, on aurait donc naturellement fait :


Mais Reactor nous permet de générer un flux d'index où nous allons par la suite mapper chaque index vers une case de notre tableau, et vous allez voir, c’est plus concis :

Enfin, pour convertir le Mono vers un Flux, il faut utiliser .flatMapMany. Voici à quoi ressemble le code tout assemblé :

Si le tableau est vide, le Flux retourné sera lui aussi vide, le suspense est encore de la partie... Mais soyez patients, l'explication arrive !

5.

Nous avons maintenant dans le Flux aucun à plusieurs objets JSON de type :

{
  "date" : "aaaa-MM-jj",
  "sourceType" : "xxx",
  "gueris" : 1111
  [...]

Ce que nous voulons, c'est seulement récupérer les données du Ministère de la santé. Il va donc falloir filtrer avec .filter le flux en ne récupérant que les objets dont le champ sourceType contient la valeur ministere-sante.

Comme chaque tableau n'a qu'un seul objet par type de source, nous prendrons uniquement le premier qui satisfait cette condition avec .next, ce qui aura pour effet de convertir le Flux en Mono .

Voici le code :

Si aucun élément ne correspond à notre source, le Mono retourné sera vide, qui pourra donc être géré par la suite avec des opérateurs spécifiques ! Mais... Encore du suspens #roulementDeTambours

6.

Nous avons maintenant dans le Mono un objet JSON suivant :

{
  "date" : "aaaa-MM-jj",
  "sourceType" : "ministere-sante",
  "gueris" : 1111
  [...]

Nous allons mapper l'objet pour récupérer le contenu du champ gueris.

Le problème du .map avec Reactor, c'est qu'il n'accepte pas que la valeur obtenue dans le mapper soit null.

Pour des raisons d'apprentissage, on va faire comme si le champ gueris pouvait être null. Pour gérer ce cas, le seul moyen est d'utiliser le .flatMap qui permet de mapper un élément vers 0 à 1 élément (et 0 à N éléments pour un Flux). Pour représenter 0 élément, il faut utiliser Mono.empty() et pour 1 élément, nous avons Mono.just(valeur):

Mais Reactor ne s'arrête pas là, on peut simplifier notre ternaire en un seul morceau avec Mono.justOrEmpty(valeur), où si la valeur est nulle, elle est mappée toute seule vers un Mono.empty() !

Ce code aurait pu être écrit avec un Optional de cette manière :


Avec la gestion du champ null, le code sera donc le suivant :

Oui, encore un Mono pouvant être vide si le champ gueris est vide ou n'existe pas.

7.

C'est le moment ! À plusieurs reprises, je vous ai dit que nous pouvions obtenir un Mono ou Flux vide. Et nous allons gérer cette particularité en générant une erreur, non pas avec throw, mais avec Mono.error.

En déclenchant cette erreur, n'importe quel opérateur qui interviendrait après serait totalement ignoré (excepté certains qu'on va voir juste après), permettant de court-circuiter un Mono/Flux :


Cependant, on pourrait très bien imaginer retourner une valeur par défaut -1, faisable de cette manière (et dans ce cas, les opérateurs qui suivront pouront intervenir sur notre valeur par défaut) :

Avec un message d'erreur plus parlant, voici le code :

8.

Maintenant que nous avons extrait la logique de récupération d'un nombre de guéris en fonction d'une date et d'une source dans un Mono<Integer>, nous pouvons récupérer ce nombre pour la date d'hier et d'avant-hier. Le mieux serait de déclencher en parallèle les deux requêtes et joindre les résultats ensemble, même si elles ne répondent pas à la même vitesse.

Et c'est faisable facilement à l'aide de Mono.zip(requete1, requete2), qui va prendre nos deux appels, et joindre les résultats dans un Tuple2<Resultat1, Resultat2>, où tuple.getT1() correspondra au résultat de l'appel en 1ère position dans le .zip, et tuple.getT2() correspondra à celui de l'appel en 2ème position. Si un appel déclenche une erreur, l'autre appel sera annulé et l'erreur sera retransmise dans la suite des opérateurs. Si un appel ne retourne rien, l'autre appel sera annulé mais aucune erreur ne sera déclenchée (le Mono retourné par le .zip sera annulé) Il est possible de combiner les résultats autrement qu'avec un tuple, mais ce cas, bien qu'il soit intéressant, est hors de notre sujet. Voici le code, aussi simple qu'efficace, permettant de récupérer le nombre de guéris d'hier et d'avant hier pour la source ministere-sante, et qui nous retournera un Mono<Tuple2<Integer, Integer>> :


9.

Mettons une trace écrite, qui sera affichée avant le .doFirst de chaque appel (voir le pas #2.a) :

10.

Lorsque les deux requêtes sont terminées, nous obtenons un tuple contenant le nombre de guéris d'hier (T1) et avant-hier (T2). Pour récupérer le nombre de nouveaux guéris, il faut simplement soustraire les guéris d'hier à ceux d'avant-hier :

11.

Pour la gestion de notre erreur précédente, c'est simple : nous pouvons convertir une erreur en un autre Mono (ou un autre Flux lorsque l'erreur est attrapée dans un flux) en utilisant .onErrorResume.

Dans notre cas, nous allons nous contenter d'écrire un log avec le message d'erreur sans émettre d'élément supplémentaire, et donc le Mono retourné sera vide :