26 août 2016

PYTHON - Le module "Threading"

Design & Code

Philippe

Boulanger

Image dev et ordinateur.

26 août 2016

PYTHON - Le module "Threading"

Design & Code

Philippe

Boulanger

Image dev et ordinateur.

26 août 2016

PYTHON - Le module "Threading"

Design & Code

Philippe

Boulanger

Image dev et ordinateur.

Après la présentation des concepts, nous allons nous attaquer au module « threading » de Python… Le module permet d’utiliser le multithreading préemptif de manière assez simple. Nous nous concentrerons sur l’API disponible depuis la version 3.10 de Python.

GIL

On ne peut pas parler du multithreading en Python sans parler du GIL (Global Interpreter Lock). La gestion du multithreading a été ajoutée en 1992 par Guido Van Rossum. Il a alors intégré un verrou interne à l’interpréteur qui est, par la suite, devenu un frein historique à l’utilisation des ressources des machines multi-cœurs. Au commencement, le GIL était un atout car il permettait aux développeurs de ne pas trop se soucier de la gestion des synchronisations lors de l’écriture des extensions en C. L’interpréteur se sert du GIL pour protéger tous les objets des accès concurrentiels notamment pour protéger le compteur de références et donc la gestion de la mémoire. Et donc, du fait de la présence de ce verrou, Python n’utilise pas correctement les cœurs des processeurs actuels du fait d’un excès de synchronisation ☹.
De nombreuses tentatives pour retirer le GIL ont été faites par le passé et ont échoué. Récemment, Guido Van Rossum a rejoint Microsoft avec pour objectif fixé d’améliorer les performances de Python. La version 3.11 de Python est une première version intégrant de substantielles améliorations.

EXEMPLE DE CRÉATION DE THREADS

Pour faire du multithreading, la première étape consiste à savoir créer des threads. L’objectif est d’exécuter des acteurs qui peuvent être des fonctions ou des objets.

Acteur = fonction

Le plus simple est de commencer par un exemple simple : 4 threads écrivant 3000 fois des textes différents (des caractères allant de 0 à 3) dans la console.

from threading import Thread
# 1) la function à exécuter
def actor( text, count ):
    for i in range( count ):
        print( text, end="" )
if __name__ == "__main__":
    # 2) initialisation
    actors = [ Thread( target = actor,
                       args   = ( text, 3000 ) ) for text in "0123" ]

Il y a donc 4 étapes :

  1. Créer la fonction « actor » qui devra être exécutée par les threads ; en l’occurrence nous allons afficher « n » fois un texte dans la console

  2. Initialiser les threads : définir quelle fonction devra être lancée avec quels paramètres. A ce stade le thread n’est pas créé et la fonction « actor » n’est pas exécutée.

  3. Démarrer les threads : on les ajoute dans l’ordonnanceur et les fonctions « actor » sont exécutées

  4. Attendre la fin de tous les threads
    La classe « Thread » est l’objet en Python qui permet de créer et gérer les threads. Nous expliquerons dans le détail les API de la classe dans un autre paragraphe. Le défaut de cette technique est que si la fonction « actor » retournait un résultat important pour la suite du programme, on n’y aurait pas accès.


Acteur = objet

« Thread » est une classe que l’on peut spécialiser, cela permet de profiter de la capacité des classes de stocker des données comme variables membres. Ce faisant, on pourra, à la fin de l’exécution, récupérer le résultat pour l’exploiter.

from threading import Thread
class Actor( Thread ):
    def __init__( self, text, count ):
        Thread.__init__( self )
        self.text  = text
        self.count = count
        self.size  = 0
    def run( self ):
        for i in range( self.count ):
            print( self.text, end = "" )
        self.size = len( self.text ) * self.count
if __name__ == "__main__":
    # initialisation
    actors = [ Actor( text, 3000 ) for text in "0123" ]

Il y a donc 4 étapes :

  1. Créer la classe « Actor » qui devra être utilisée comme threads en surchargeant la fonction « run » qui sera l’action exécutée par le thread; en l’occurrence nous allons afficher « n » fois un texte dans la console

  2. Initialiser les threads : définir quelle instance de classe sera lancée avec quels paramètres. A ce stade le thread n’est pas créé et la fonction « Actor.run » n’est pas exécutée.

  3. Démarrer les threads : on les ajoutent dans l’ordonnanceur et les fonctions « actor » sont exécutées

  4. Attendre la fin de tous les threads

API DE LA CLASSE THREAD

Maintenant que l’on a vu, par des exemples, comment créer des threads selon les deux options possibles, on sait que la classe Thread est essentielle et que maîtriser son API est  nécessaire.

Constructeur

Utile pour utiliser un acteur sous forme de fonction, l’interface du constructeur est :


Pour démarrer un thread en mode fonctionnel, il nous faut passer les arguments suivants :

  • « target » : on lui passe la fonction à exécuter

  • « args » : on y stocke le tuple contenant les arguments positionnels à passer à « target »

  • « kwargs » : on y stocke le dicitionnaire des paramètres nommés  à passer à « target »

« name » permet de nommer le thread pour le reconnaitre. Si on le laisse à None, par défaut l’API génèrera un nom du type « Thread-N » avec N entier.

« group » n’a pour l’instant pas d’utilité, il est réservé pour un usage futur.

« Daemon »  permet de définir si ce thread sera un démon ou non. Cette valeur doit être définie avant l’appel à la fonction « start ». L’interpréteur Python s’arrête dès que tous les threads non-démons sont terminés.

start()

Cette function démarre le thread été exécute l’action. Elle ne peut être appelée qu’une fois par objet Thread, une exception RuntimeError sera levée en cas de second appel..

join( timeout = None )

Attend jusqu’à ce que le thread soit terminé (si timeout est None). Pour définir timeout, il faut fournir une valeur flottante correspondant au temps maximum en seconde à attendre. La fonction « join » ne retournant pas de valeur, il faudra utiliser la fonction « is_alive ».

run()

C’est la fonction qui est appelée par « start ». Si on veut créer notre classe de Thread spécialisée, il faut surcharger cette fonction.

name

Propriété permettant de lire/écrire le nom du thread.

daemon

Propriété qui permet de savoir si ce thread est un démon ou non.

is_alive()

Retourne True si le thread est en vie (continue de s’exécuter).

native_id

Identifiant du thread affecté par l’OS.

LES PRIMITIVES DE SYNCHRONISATION

Les verrous Lock et RLock sont les plus usités mais ce n’est pas une raison de négliger les autres qui ont leur utilité.

Lock

C’est la classe Python correspondant au concept de verrou (mutex). L’implémentation interne utilise le verrou concret le plus efficace fourni sur la plateforme. L’interface est simple :

  • acquire( blocking = True, timeout = -1 )
    attend jusqu’à acquérir le mutex si blocking est True. Si blocking est False, il n’y d’attente que jusqu’à ce que le timeout (valeur flottante en seconde) expire. La fonction retourne True si le verrou a été pris, False sinon.

  • release()
    libère le verrou

  • locked()
    retourne True si le verrou est pris.

Voici un exemple d’utilisation du mutex pour garantir la libération du verrou :


Grâce à la gestion de contexte, on dispose d’une méthode plus simple :


RLock

C’est la classe Python correspondant au concept de verrou récursif ou réentrant (recursive mutex). L’implémentation interne utilise le verrou récursif concret le plus efficace fourni sur la plateforme. L’interface est la même que pour la classe Lock.

Timer

Timer est une sous-classe de la classe Thread. Elle déclenche un appel de fonction au bout d’un temps déterminé à l’avance.

L’interface est simple :

  • Timer(interval, function, args=None, kwargs=None)
    Appellera “function(*args, **kwargs)” après “interval” seconds. C’est une initialization, le compte à rebours n’est pas lancé.

  • start()
    démarre le timer/compte à rebours

  • cancel()
    stoppe le timer durant la période d’attente

  • function
    variable membre contenant la fonction à appeler après l’intervalle

  • args, kwargs
    variables membres contenant les paramètres à passer à la fonction après l’intervalle

  •  interval
    variable contenant le temps à attendre

  • finished
    variable membre contenant un objet de type threading.Event que l’on attend pour déclencher l’appel de la fonction

Semaphore

Le concept du sémaphore de Dijkstra pour Python. Les noms des fonctions ont été homogénéisées pour faciliter l’apprentissage. L’interface est la suivante :

  • Semaphore( value = 1 )
    constructeur qui prend la valeur initiale du compteur de ressources disponibles

  • acquire( blocking = True, timeout = None )
    attend jusqu’à acquérir une ressource du sémaphore si blocking est True. Si blocking est False, il n’y d’attente que jusqu’à ce que le timeout (valeur flottante en seconde) expire. La fonction retourne True si la ressource a été capturée, False sinon.

  • release( n = 1 )
    libère n ressources du sémaphore

EXEMPLE DES PRODUCTEURS/CONSOMMATEURS

Nous avons un groupe de producteurs qui envoient des messages et nous avons un groupe de consommateurs qui vont dépiler les messages pour les traiter. Afin de synchroniser les producteurs et les consommateurs nous utiliserons une queue. Ce type de traitement est effectué dans des applications (notamment bancaires) avec des queues MQSeries ou Kafka.

Nos producteurs vont écrire un nombre (prévu à l’avance) de messages et s’arrêter. Par contre les consommateurs vont devoir tourner jusqu’à ce que nous leur disions de s’arrêter. Nous allons donc convenir d’un message de stop (une chaîne de caractères contenant le mot « STOP » par exemple) mis dans la queue qui provoquera l’arrêt du consommateur qui l’aura récupéré : il faudra donc poster un message par consommateur…

Queue

L’idée est de construire une queue FIFO (First In First Out) protégée contre les accès concurrents (ThreadSafe). Et nous allons le faire avec les outils basiques même si Python propose des outils prêt à l’emploi. Il nous faudra une liste et les messages seront insérés en tête (avec « l.insert( 0, msg ) ») et récupérer en queue (avec « l.pop() »).

class ThreadSafeQueue:
    def __init__

Dans le constructeur, on crée deux variables membres « _lock » pour le verrou qui rendra l’objet thread safe et « _queue » pour contenir la liste de messages.

La fonction « send( msg ) » servira au producteurs pour envoyer les messages aux consommateurs.

La fonction « get_msg() » dépile un message et le retourne. Si aucun message n’est disponible, elle retournera None.

Gestion des I/O

Les consommateurs écriront les messages dans un fichier afin de pouvoir vérifier que le programme se comporte correctement et ne perd aucun message. Les entrées/sorties sont toujours un point problématique : les fichiers ou assimilés-fichiers sont rarement ThreadSafe. Là aussi nous devrons utiliser un verrou.

class Output:
    def __init__


Initialisation des structures communes

Il faut créer les données comme variables membres :


Le producteur

Nous gérerons les producteurs et consommateurs comme de simples fonctions. Le producteur aura une interface à deux paramètres : un identifiant et le nombre de messages à créer… Les messages seront des tuples de 3 valeurs : l’identifiant du producteur, un mot (MSG) et le numéro du message.


Le consommateur

Un consommateur consomme les messages de la queue jusqu’à obtenir un message de stop. Chaque message sera écrit dans le fichier de sortie (« output »). On a donc une boucle qui ne s’arrêtera que lorsqu’un message « STOP » arrivera.


Lancement des threads

Un consommateur consomme :

# initialisation
consumers = [ Thread( target = consumer, args = ( 1, ) ),
              Thread( target = consumer, args = ( 2, ) ) ]
producers = [ Thread( target = producer, args = ( 1, 5000 ) ),
              Thread( target = producer, args = ( 2, 4000 ) ) ]
# lancement des threads
for thread in [ *producers, *consumers ]

EXEMPLE DE TIMER  A REPETITION

L’objectif va être de créer un mécanisme capable d’appeler de façon répétée toutes les x secondes une fonction : pour mettre à jour une horloge par exemple. Pour obtenir ce résultat nous allons créer une classe qui héritera de la classe Timer.

RepeatTimer

Un timer à répétition, voilà la solution à notre problème. Il suffit de surcharger la fonction « run » de Timer en y ajoutant une boucle while :


Affichage de l’horloge

L’action que nous souhaitons appeler périodiquement est l’affichage de l’horloge. Nous allons faire simple :


Lancement du timer

Lancer le timer est facile :


Pouvoir l’arrêter lorsque l’on en a plus besoin peut être utile aussi :


CONCLUSION

Nous vous avons présenté les principales API du module threading avec des exemples concrets de mise en œuvre vous permettant de mieux comprendre leur fonctionnement, mais sans couvrir toutes les possibilités (Event, Condition, etc…). J’espère vous avoir ouvert donner des idées pour vos prochaines réalisations.