7 Leçons apprises en utilisant les Reactive Extensions
Cette dernière partie est consacrée à la présentation des bonnes pratiques, des écueils et des patterns retenus par les orateurs dans leur expérience d’utilisation de RxJava en utilisant les Reactive Extensions.
7.1 RxJava et les Stream Java 8
Même si RxJava fournit un ensemble d’opérateurs très riche pour manipuler les Observable, il peut être parfois raisonnable d’utiliser les Stream Java 8 à la place de ces opérateurs. Les Stream ont l’avantage d’être plus performants pour les traitements sur les données en local. Voici un exemple où l’utilisation d’un Stream dans du Rx est raisonnable :1 Observable.just(1, 2, 3, 4) 2 .toList() 3 .map( 4 // Stream Java 8 5 list -> list.stream() 6 .filter(i -> i % 2 == 0) 7 .collect(Collectors.toList())) 8 .subscribe();
Un Stream Java 8 est utilisé pour filtrer les éléments de la liste.
On peut tout simplement utiliser des collections de l’API standard dans les instructions RxJava, comme cet exemple qui crée un Observable à partir d’une collection d’Observables :
Attention !
Cette combinaison de Stream et RxJava doit être utilisée avec parcimonie. Car sinon on a vite fait de se retrouver avec des codes de ce genre où il devient impossible de démêler les appels RxJava des appels de Stream Java 8 :
7.2 Utiliser les lamba pour simplifier les transformations des Observables
Soit l’instruction Rx suivante dans une application Android :
En la récrivant en utilisant les lambdas, elle revient à :
Si l’environnement de développement le permet, il ne faut donc pas hésiter à utiliser les lambdas pour simplifier l’écriture d’instructions Rx.
7.3 Découper les longues transformations Rx pour améliorer leur lisibilité
Soit l’instruction Rx suivante :
Sa lisibilité peut être améliorée en extrayant certaines parties dans des méthodes :
7.4 Usage de flatMap
L’opérateur flatMap retourne un Observable construit par une fonction qu’elle accepte en argument. Voici quelques cas remarquables d’utilisation de flatMap lorsqu’on débute avec Rx.
7.4.1 Imbrication de souscriptions
Lorsque nous avons besoin des valeurs d’un Observable pour souscrire aux valeurs d’un autre Observable, on peut avoir le mauvais réflexe d’écrire quelque chose du genre :
Mais en faisant cela, outre le fait qu’on s’engage dans du “callback hell”, on se prive de l’usage de la propriété de monade des Observable, car rappelez-vous flatMap est une des propriétés d’une structure monadique. Ainsi en utilisant flatMap, cette imbrication de souscription s’écrira (et devrait) s’écrire comme suit :
7.4.2 Observable infini
Soit un Observable émettant une séquence de valeurs chaque seconde, soit une séquence sans fin, à laquelle on applique l’opérateur flatMap :
A chaque écoulement de seconde, l’Observer écrit sur la sortie standard la séquence 1, 2, 3. Bien que l’Observer soit notifié par l’opérateur flatMap, il ne faut pas oublier que la source principale d’événement reste:
Donc le flux reste infini même avec l’application de l’opérateur flatMap, autrement dit il n’y aura pas d’événement de fin de flux.
7.5 Observable infini et operateur toList
Un piège à éviter est de vouloir appliquer l’opérateur toList à un flux infini:
Cette instruction va à coup sûr aboutir à une saturation de mémoire si le code reste en exécution assez longtemps, car il faut savoir que l’opérateur toList essaye de retourner tous les éléments émis par l’Observable source dans une seule liste. Vu que le flux de l’Observable source n’aura pas de fin, l’opérateur toList va continuer à stocker indéfiniment les éléments qu’il va recevoir. En résumer une situation à éviter.
7.6 Cold et Hot Observable
Les Observable peuvent être répartis en deux catégories : Cold et Hot. En fonction du contexte, l’appartenance d’un Observable à l’une ou l’autre des deux catégories peut avoir un impact important.
7.6.1 Cold Observable
Un Cold Observable crée l’émetteur des valeurs à émettre lors des souscriptions. Exemple:
7.6.2 Hot Observable
A l’inverse, un Hot Observable utilise un émetteur de valeurs existant et actif indépendamment des souscriptions. Exemple:
7.6.3 Illustration de l’importance du type Cold ou Hot de l’Observable
Soit un appel de web service qui retourne un Observable:
Et soit l’utilisation suivante de cet Observable:
Si l’Observable est de type Cold, l’appel du Web Service se fera au moment de la souscription, un coût dont le client doit avoir conscience, surtout s’il compte souscrire à l’Observable dans d’autres instructions, chacune des souscriptions aboutissant à un nouvel appel du Web Service.
7.7 Utilisation de MDC dans les instructions RX
Le mécanisme MDC (Mapped Diagnostic Context) bien connu des bibliothèques de journalisation (log4j, logback, slf4j) permet de logger des informations de contexte propres à une hiérarchie de threads (c’est-à-dire que l’information de contexte présente dans le MDC est héritée par les threads fils lors de leur création).
Ce mécanisme de MDC ne marche malheureusement plus tout seul dans certains types d’environnements de multithreading sans intervention du développeur. En effet dans un environnement où un pool de thread est utilisé pour exécuter des tâches, une tâche peut se retrouver à être exécutée dans un thread ayant un MDC avec des valeurs inattendues. Simplement parce qu’au lieu d’être exécutée sur un nouveau thread du pool héritant du MDC du thread courant, la tâche a été exécutée sur un thread du pool déjà utilisé pour l’exécution d’autres tâches ayant modifié son MDC. Ce même phénomène se produit avec RxJava lorsqu’on utilise les Schedulers pour paralléliser les traitements. Il faut utiliser un workaround pour fiabiliser le mécanisme du MDC dans ces cas.
D’abord illustrons concrètement le problème que nous évoquons avec le billet repris du blog http://blog.mabn.pl/2014/11/rxjava-logback-and-mdc-threadlocal.html. Soit la suite d’instructions suivante :
et la sortie correspondante :
Le plus intéressant commence à la ligne 18 des messages de log, où on constate que l’Observable «third_observable» est exécuté avec la valeur «BBB» dans le MDC, alors que d’après la ligne 21 du code, on s’entendrait à ce que le MDC contienne plutôt «CCC». L’explication de ce qui s’est passé est simplement qu’avant le début de l’exécution de l’Observable «third_observable», l’Observable «second observable» se terminait, libérant le thread qui l’exécutait, et c’est ce thread ayant « BBB » dans son MDC que le Scheduler va utiliser pour exécuter l’Observable « third_observable ».
Le workaround proposé notamment dans la continuité de l’article du blog est d’étendre le hook des Scheduler pour y injecter nous même le MDC du thread courant dans le contexte de la prochaine action à exécuter par le Scheduler. Bien sûr, il faudra ensuite créer un Scheduler qui saura extraire le MDC que nous aurons injecté pour l’utiliser correctement.
7.8 Enrichissement des exceptions des Observable: OnErrorThrowable
Dans un contexte d’exécution asynchrone, les messages des exceptions doivent être particulièrement riches pour faciliter les investigations. Or dans un traitement comme dans l’instruction suivante, il n’y a aucune information sur le contexte de la valeur émise lorsque l’erreur se produit :
La valeur émise par l’Observable au moment de l’erreur est pourtant une information très utile sur la cause de l’erreur. Raison pour laquelle il existe dans RxJava des méthodes dédiées pour faciliter la capture et la lecture de cette valeur en cas d’erreur (lignes 8 et 12) :
7.9 Enrichissement sémantique de la pluralité des valeurs des Observable
Lorsque nous avons affaire à une API étrangère retournant un Observable, nous souhaiterions bien avoir une idée du nombre d’éléments émis par l’Observer, au moins savoir si l’Observer émet zéro, un ou plusieurs éléments. Une bonne documentation à jour permettra bien sûr de répondre à ce souci. Mais un typage approprié sera encore plus convenable. Ainsi RxJava propose deux spécialisations (pas au sens POO) des Observable pour répondre à ce besoin de clarté.
7.9.1 Single
Un Single est comme un Observable, avec la particularité qu’elle n’émet que l’une des deux notifications : succès ou erreur.
Single = onSuccess | onError
L’opération suivante retourne un Single qui n’émettra donc qu’une valeur ou une erreur:
Un Single comporte une méthode toObservable() qui permet de le transformer en un Observable émettant les mêmes événements.
7.9.2 Completable
Un Completable est aussi comme un Observable, avec la particularité de n’émettre qu’un événement de complétion ou d’erreur, mais pas de valeur.
Completable = onCompleted | onError
L’opération suivante qui ne retourne pas de valeur est donc convenablement typé avec un Completable :
Un Completable comporte des méthodes toObservable() et toSingle() qui permettent de le transformer respectivement en un Observable et un Single émettant les mêmes événements que lui.
7.9.3 Désabonnement aux Observables
La souscription à un Observable retourne un objet Subscription qui permet de mettre fin à l’abonnement lorsqu’on le souhaite. Dans l’exemple suivant :
le désabonnement qui est correctement géré par le résultat de l’opérateur interval mettra fin à l’émission des valeurs. Après un désabonnement, l’Observer désabonné ne recevra pas de notification de fin de flux.
7.9.3.1 CompositeSubscription pour grouper la gestion des souscriptions
Ce type de besoin de désabonnement peut être simplifié avec l’usage d’un objet CompositeSubscription. Avec l’exemple précédent, cela donne :
C’est une fonction qui s’avère particulièrement pratique dans un environnement tel que celui d’Android où le cycle de vie des applications peut conduire à des abonnements et désabonnements massifs:
7.9.3.2 Action de l’Observable lors du désabonnement
Nous l’avons déjà vu, l’Observable peur spécifier l’action à exécuter lors du désabonnement d’un Observer.
Avec la méthode de création create :
Encore une fois, la méthode de création create n’étant à utiliser que quand on ne peut faire autrement, il faudra penser à la présence de la factory using lorsqu’on a besoin de réagir à une action de désabonnement :
8 Conclusion
Cette université était une très bonne occasion d’avoir une large vision de ce qu’est RxJava et au passage les Reactive Extensions. Après l’introduction de RxJava comme alternative aux CompletableFuture pour faire de la programmation concurrente, nous avons vu les principes de base de RxJava, à travers le couple Observable/Observer et l’utilisation d’un certain nombre de constructeurs représentatifs d’Observable. Nous avons ensuite découvert comment enfin faire réellement de la programmation asynchrone avec RxJava (et Rx aussi) avec la présentation de la notion de Scheduler. Nous avons constaté que la spécification JAX-RS en l’état actuel ne supportait pas nativement Rx, et que pour bénéficier d’une telle intégration native, il faut faire l’effort de voir au-delà de Java EE les nouvelles alternatives que sont les frameworks orientés Microservices tels que Netty/RxNetty ou Vert.x. Nous avons abordé la composition de services avec RxJava, le sujet le plus représentatif du thème de la conférence (« Architecture découplée grâce aux Reactive Extensions »), en découvrant comment la gestion des erreurs peut être améliorée entre les Observable et les Observer, comment RxJava permet de gérer entre autres le nombre de tentatives d’exécution d’une opération où la durée d’exécution maximale d’une opération. Toujours dans le cadre de la composition de services, Hytrix avec l’implémentation du Circuit Breaker a été introduit comme alternative spécialisée pour la gestion des erreurs des services composés par le service composite. Ensuite nous avons eu une brève introduction à la notion de back-pressure dans RxJava. La dernière partie de la session a porté sur les leçons apprises par les orateurs dans leur expérience d’utilisation de RxJava.
Ici donc prend fin la restitution de ce que j'ai appris sur Rx et RxJava en particulier durant cette université de Devoxx France. Cet exercice était intéressant car n'ayant pas d'expérience avec Rx, j'ai essayé de remonter à son origine, ce qui m'a permis de découvrir pas mal de choses, comme par exemple le fait que Rx existait depuis 2009 chez Microsoft où elle a été créée. J'ai aussi appris le concours de circonstances ayant permis la naissance de RxJava. C'est Jafar Husain, un ingénieur de Netflix qui avait découvert Rx lorsqu'il travaillait chez Microsoft qui a poussé l'adoption de cette technologie auprès de son collègue Ben Christensen, celui-là même qui se laissera convaincre de l'utilité de Rx et lancera le développement de l'implémentation en Java qui sera RxJava. Une très bonne ressource que je conseillerais pour un apprentissage de RxJava est le livre Reactive Programming with RxJava de Tomasz Nurkiewicz et Ben Christensen. Le livre décrit bien entendu l'essentiel de l'API de RxJava, mais dans un contexte d'utilisation dans la vraie vie, donc en évoquant des problématiques connus qui peuvent être résolus par tel type d'usage où au contraires être engendrés par des usages déconseillés. Le livre explique aussi bien comment introduire RxJava dans une application existante non construite suivant le paradigme réactif(des conseils que je commence personnellement à mettre en pratique) que comment créer une application sur une stack reactive. Le livre traite aussi des questions qu'on va se poser i