Encore un café qui se transforme en article sur le Touilleur. Avec Florent Ramiere de Jaxio nous discutions sur des techniques de synchronisation. Voici le problème à résoudre : soit une liste d’éléments à maintenir correspondant à des tâches. Des threads empilent des tâches à effectuer dans cette queue pendant que d’autres threads récupèrent à la sortie des tâches. Cependant on souhaite que cette liste chaînée offre des services supplémentaires afin que les consommateurs attendent qu’une certaine quantité de tâches soit disponible. Nous voulons aussi pouvoir prendre soit la tâche la plus ancienne (type FIFO) soit au contraire la tâche la plus récente de cette liste.
Là vous êtes perplexe… ne faites pas les timides je sais que vous n’avez rien compris.
Reprenons le problème avec un oeil différent : dans un restaurant, nous avons 5 serveurs qui se chargent de poser des commandes sur un passe-plat. Le passe-plat est une table où les serveurs posent les commandes. Les commandes les plus récentes sont posées à droite, de sorte que la commande du client qui attend depuis le plus longtemps se trouve à gauche.
Les commandes sont enregistrées sur un tableau. Les 3 cuisiniers surveillent la liste des commandes. Un cuisinier dépile toujours la commande la plus à gauche posée sur ce passe-plat. Peu importe qu’un serveur l’ai posé à l’instant ou pas, il doit toujours traiter la commande la plus à gauche sur le passe-plat.
Je sais que c’est simpliste comme image, mais vous allez comprendre pourquoi lorsque je vais sortir la nouveauté de la saison 2008…
Ensuite nous voulons que les cuisiniers attendent qu’au moins 3 commandes soient présentes, afin d’optimiser la cuisson des aliments. L’idée est que lorsque 3 commandes sont présentes et que les cuisiniers n’ont rien à faire, ces 3 commandes sont retirées du passe-plat.
Imaginez comment vos cuisiniers vont surveiller le passe-plat et attendre de manière synchrone que 3 commandes soient présentes…
Enfin comme nous ne voulons pas non plus laisser les clients en attente trop longtemps, une commande qui a plus de 5 mn d’ancienneté a le droit d’être dépilée… sans qu’il ne soit nécessaire d’attendre 2 autres commandes… Ok ?
Enfin pour terminer il faut aussi penser aux serveurs : nous souhaitons qu’ils attendent lorsque 5 commandes sont posées sur le passe-plat afin d’éviter d’engorger notre passe-plat pour rien. Dès qu’un cuisinier dépile une commande, le serveur sera alors débloqué et pourra poser sa commande sur le passe-plat.
Donc nous avons une liste chaînée, avec de la synchronisation. Nous avons aussi une notion de seuil à respecter pour les consommateurs. Et enfin nous avons une notion de temps limite pour s’amuser encore plus…
Il est temps de ressortir ses manuels et de regarder ce que nous avons en stock. Parlons d’abord « deque » (prononcer deck).
C’est quoi une deque ?
A prononcer « Deck », une deque est une « double-ended queue » soit une liste à double entrée en français dans le texte (on ne dit paspas double sortie en français, cela fait un peu trop tunning…). C’est une liste chainée composée d’une tête et d’une queue. La différence avec une Fifo est qu’il est possible de dépiler soit au début, soit à la fin là où une Fifo ne vous laisse accéder qu’à la fin. Java 6 propose une interface Deque. Les méthodes remarquables sont limpides :
First Element (Head) | Last Element (Tail) | |||
Throws exception | Special value | Throws exception | Special value | |
Insert | addFirst(e) |
offerFirst(e) |
addLast(e) |
offerLast(e) |
Remove | removeFirst() |
pollFirst() |
removeLast() |
pollLast() |
Examine | getFirst() |
peekFirst() |
getLast() |
<a href="http://java.sun.com/javase/6/docs/api/java/util/Deque.html#peekLast()">peekLast()</a>
|
La méthode addFirst permet d’ajouter une nouvelle commande au début de notre passe-plat. Cependant si nous souhaitons faire passer une commande avant toutes les autres, la méthode addLast nous donne un droit d’écriture afin de placer notre commande à la fin.
La méthode peekLast permet à un cuisinier de s’assurer que la dernière tâche en attente n’est pas dans la pile depuis un certain temps… Mais ce n’est pas encore la solution que je souhaite vous présenter.
BlockingDeque
Une BlockingDeque représentée par l’interface java.util.concurrent.BlockingDeque est une deque qui va attendre d’être non vide avant de laisser un consommateur dépiler un élément. Clairement : nos cuisiniers attendent sagement qu’une commande arrive. C’est aussi une Queue capable de faire attendre un serveur lorsque la liste est pleine, c’est de cette manière que nous allons faire attendre les serveurs lorsque la BlockingQueue est pleine.
En Java, l’interface BlockingDeque
public boolean offer (E element, long timeout, TimeUnit unit) public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException public boolean offerLast(E element,long timeout, TimeUnit unit)
La méthode offer permet d’ajouter un nouvel élément à la fin de la liste, en attendant une certaine quantité de temps précisée par la valeur timeout précisé par l’unité de TimeUnit. Si jamais finalement l’entrée n’a pas été ajoutée au bout du temps t, alors la méthode retournera false. Sinon elle retourne true. Pratique non ?
La méthode offerFirst permet de faire passer une commande devant toutes les autres, tout en restant avec la contrainte du timeout comme précédemment.
Du côté de nos consommateurs, les méthodes suivantes permettent soit de bloquer jusqu’à ce qu’une commande soit disponible, soit d’attendre tout en respectant un timeout.
Les méthodes takeFirst et takeLast sont bloquantes :
public E takeFirst() public E takeLast()
Les méthodes pollFirst() et pollLast() permettent d’examiner et d’attendre que la liste se remplisse, tout en restant dans un certain intervalle de temps comme précédemment :
public E pollFirst(long timeout, TimeUnit unit) public E pollLast(long timeout, TimeUnit unit)
LinkedBlockingDeque
Cette classe est une implémentation en Java de l’interface et permet de fixer une capacité maximum afin de limiter le nombre d’éléments dans la liste :
public LinkedBlockingDeque() public LinkedBlockingDeque(java.util.Collection< ? extends E> c) public LinkedBlockingDeque(int capacity)
Un exemple complet
Voici un exemple complet avec un Serveur et un Cuisinier. Ces deux objets accèdent la même BlockingQueue. La capacité de la queue est de 5 commandes.
Pour faire simple : le serveur ajoute un Integer à la BlockingQueue toutes les 300 millisecondes. Le cuisinier prend un élément toutes les 3000 millisecondes. Si le Serveur fonctionne correctement, au bout de 1500 ms (5 x 300) la queue des commandes sera pleine. Dans ce cas les 5 autres commandes suivantes seront mises en attente automatiquement, le Serveur sera bloqué.
import java.util.concurrent.BlockingDeque; /** * A very simple producer class to demonstrate BlockingDeque functionnality. * * @author N.Martignole */ public class Serveur implements Runnable { private String name; private BlockingDeque<Integer> deque; // nous allons avoir 10 commandes a la suite private int[] numbers = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; public Serveur(String name, BlockingDeque<Integer> deque) { this.name = name; this.deque = deque; } public void run() { for (int i = 0; i < 10; i++) { try { deque.putFirst(numbers[i]); System.out.println(name + " place une nouvelle commande #" + numbers[i]); System.out.println(name + " affiche l'état de la queue: " + deque +"\n"); Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Voyons la classe du cuisinier :
import java.util.concurrent.BlockingDeque; public class Cuisinier implements Runnable { private String name; private BlockingDeque<Integer> deque; public Cuisinier(String name, BlockingDeque<Integer> deque) { this.name = name; this.deque = deque; } public void run() { for (int i = 0; i < 10; i++) { try { int j= deque.takeLast(); System.out.println("Cuisinier "+ name + " prend " + j); System.out.println(name + " regarde la queue: " + deque + "\n"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Enfin il nous faut une class de Test pour tester le tout:
import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; public class Tester { public static void main(String[] args) { // Creation d'une queue pour notre passe plat de 5 cases BlockingDeque<Integer> deque = new LinkedBlockingDeque<Integer>(5); // Creation d'un serveur et d'un cuisinier pour faire simple Runnable serveur = new Serveur("Serveur", deque); Runnable cuisinier = new Cuisinier("Cuisinier", deque); // Lance le tout new Thread(serveur).start(); new Thread(cuisinier).start(); } }
Avez-vous constaté que le Serveur attend 300 millisecondes avant de poser une nouvelle commande là où le cuisinier attend 3000 ms ? Pour 10 éléments, au bout de 1500 ms le Serveur sera donc bloqué car la queue sera pleine. Nous devrions donc le voir attendre jusqu’à ce qu’une place se libère.
Le résultat est intéressant :
Serveur place une nouvelle commande #1 Serveur affiche l'état de la queue: [1] Cuisinier commence la commande #1 Cuisinier regarde le passe-plat [] Serveur place une nouvelle commande #2 Serveur affiche l'état de la queue: [2] Serveur place une nouvelle commande #3 Serveur affiche l'état de la queue: [3, 2] Serveur place une nouvelle commande #4 Serveur affiche l'état de la queue: [4, 3, 2] Serveur place une nouvelle commande #5 Serveur affiche l'état de la queue: [5, 4, 3, 2] Serveur place une nouvelle commande #6 Serveur affiche l'état de la queue: [6, 5, 4, 3, 2] Cuisinier a terminé la commande #1 Cuisinier commence la commande #2 Serveur place une nouvelle commande #7 Cuisinier regarde le passe-plat [7, 6, 5, 4, 3] Serveur affiche l'état de la queue: [7, 6, 5, 4, 3] Cuisinier a terminé la commande #2 Cuisinier commence la commande #3 Serveur place une nouvelle commande #8 Cuisinier regarde le passe-plat [8, 7, 6, 5, 4] Serveur affiche l'état de la queue: [8, 7, 6, 5, 4] Cuisinier a terminé la commande #3 Cuisinier commence la commande #4 Cuisinier regarde le passe-plat [8, 7, 6, 5] Serveur place une nouvelle commande #9 Serveur affiche l'état de la queue: [9, 8, 7, 6, 5] Cuisinier a terminé la commande #4 Cuisinier commence la commande #5 Serveur place une nouvelle commande #10 Cuisinier regarde le passe-plat [10, 9, 8, 7, 6] Serveur affiche l'état de la queue: [10, 9, 8, 7, 6] Le serveur a terminé de placer les commandes Cuisinier a terminé la commande #5 Cuisinier commence la commande #6 Cuisinier regarde le passe-plat [10, 9, 8, 7] Cuisinier a terminé la commande #6 Cuisinier commence la commande #7 Cuisinier regarde le passe-plat [10, 9, 8] Cuisinier a terminé la commande #7 Cuisinier commence la commande #8 Cuisinier regarde le passe-plat [10, 9] Cuisinier a terminé la commande #8 Cuisinier commence la commande #9 Cuisinier regarde le passe-plat [10] Cuisinier a terminé la commande #9 Cuisinier commence la commande #10 Cuisinier regarde le passe-plat [] Cuisinier a terminé la commande #10 Cuisinier a terminé de traiter les commandes
Comme on peut le constater, avec peu d’effort il est possible d’implémenter un système synchrone propre, sans faire appel à du vieux code comme synchronized, wait ou join. C’est l’une des forces de l’api Concurrent en Java qui permet de programmer des cas complexes comme celui-ci en quelques lignes.
Pour terminer voyons le code du Tester avec 2 Cuisniers. J’y ajoute une notion de timeout afin que les cuisiniers attendent 1000 ms au maximum lorsque la queue est vide. Pourquoi cela ? Car chacun des 2 cuisiniers va traiter 5 commandes, bien que cela ne soit pas déterministe. Je ne peux pas baser mon code uniquement sur la supposition que chacun des cuisiniers traite 5 commandes, rien ne me dit que par exemple le GarbageCollector ne va pas perturber mon code. Pour cela la solution est simple, il suffit d’ajouter un timeout comme nous l’avons vu au début de cette article, dans la class Cuisinier .
La méthode run() de Cuisinier devient alors :
public void run() { for (int i = 0; i < 5; i++) { try { int j= deque.pollLast(1000, TimeUnit.MILLISECONDS); System.out.println(name + " commence la commande #" + j); System.out.println(name + " regarde le passe-plat " + deque + "\n"); Thread.sleep(3000); System.out.println(name+ " a terminé la commande #"+j); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(name+" a terminé de traiter les commandes"); }
Le résultat de l’exécution donnera alors :
Serveur place une nouvelle commande #1 Serveur affiche l'état de la queue: [1] Cuisinier 1 commence la commande #1 Cuisinier 1 regarde le passe-plat [] Serveur place une nouvelle commande #2 Serveur affiche l'état de la queue: [2] Cuisinier 2 commence la commande #2 Cuisinier 2 regarde le passe-plat [] Serveur place une nouvelle commande #3 Serveur affiche l'état de la queue: [3] Serveur place une nouvelle commande #4 Serveur affiche l'état de la queue: [4, 3] Serveur place une nouvelle commande #5 Serveur affiche l'état de la queue: [5, 4, 3] Serveur place une nouvelle commande #6 Serveur affiche l'état de la queue: [6, 5, 4, 3] Serveur place une nouvelle commande #7 Serveur affiche l'état de la queue: [7, 6, 5, 4, 3] Cuisinier 1 a terminé la commande #1 Cuisinier 1 commence la commande #3 Serveur place une nouvelle commande #8 Cuisinier 1 regarde le passe-plat [8, 7, 6, 5, 4] Serveur affiche l'état de la queue: [8, 7, 6, 5, 4] Cuisinier 2 a terminé la commande #2 Cuisinier 2 commence la commande #4 Cuisinier 2 regarde le passe-plat [9, 8, 7, 6, 5] Serveur place une nouvelle commande #9 Serveur affiche l'état de la queue: [9, 8, 7, 6, 5] Cuisinier 1 a terminé la commande #3 Cuisinier 1 commence la commande #5 Cuisinier 1 regarde le passe-plat [9, 8, 7, 6] Serveur place une nouvelle commande #10 Serveur affiche l'état de la queue: [10, 9, 8, 7, 6] Le serveur a terminé de placer les commandes Cuisinier 2 a terminé la commande #4 Cuisinier 2 commence la commande #6 Cuisinier 2 regarde le passe-plat [10, 9, 8, 7] Cuisinier 1 a terminé la commande #5 Cuisinier 1 commence la commande #7 Cuisinier 1 regarde le passe-plat [10, 9, 8] Cuisinier 2 a terminé la commande #6 Cuisinier 2 commence la commande #8 Cuisinier 2 regarde le passe-plat [10, 9] Cuisinier 1 a terminé la commande #7 Cuisinier 1 commence la commande #9 Cuisinier 1 regarde le passe-plat [10] Cuisinier 2 a terminé la commande #8 Cuisinier 2 commence la commande #10 Cuisinier 2 regarde le passe-plat [] Cuisinier 1 a terminé la commande #9 Cuisinier 1 a terminé de traiter les commandes Cuisinier 2 a terminé la commande #10 Cuisinier 2 a terminé de traiter les commandes
En conclusion nous avons vu un exemple simple de l’utilisation de la class LinkedBlockingQueue. Cette classe du package Concurrent de Java 6 offre des fonctionnalités puissantes suffisantes pour répondre à un grand nombre de cas pratiques et réalistes.
0 no like