Threads producteur / consommateur utilisant une file d'attente
j'aimerais créer une sorte d'application de filetage Producer/Consumer
. Mais je ne sais pas quelle est la meilleure façon de mettre en place une file d'attente entre les deux.
donc j'ai deux idées (toutes deux peuvent être totalement fausses). Je voudrais savoir qui serait le mieux et si les deux sucent alors quelle serait la meilleure façon de mettre en œuvre la file d'attente. C'est principalement mon implémentation de la file d'attente dans ces exemples qui me préoccupe. J'allonge une classe de file d'attente qui est dans la classe maison et est filé en sécurité. Ci-dessous deux exemples avec 4 classes chacune.
classe Principale-
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
Consommateur de la classe
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
classe de producteur -
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
classe de file d'attente -
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
ou
classe Principale-
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
Consommateur de la classe
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
classe de producteur -
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
classe de file d'attente -
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
et partez!
6 réponses
Java 5+ a tous les outils dont vous avez besoin pour ce genre de chose. Vous voudrez:
- mettez tous vos producteurs dans un seul
ExecutorService
; - mettez tous vos consommateurs dans un autre
ExecutorService
; - si nécessaire, communiquer entre les deux en utilisant un
BlockingQueue
.
je dis" si nécessaire " pour (3) parce que de mon expérience c'est une étape inutile. Tout ce que vous faites est de soumettre de nouvelles tâches au service d'exécuteur testamentaire du consommateur. So:
final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
ainsi le producers
soumettre directement à consumers
.
OK, comme d'autres, la meilleure chose à faire est d'utiliser java.util.concurrent
. Je recommande vivement "la concurrence Java dans la pratique". C'est un grand livre qui couvre presque tout ce que vous devez savoir.
quant à votre implémentation particulière, comme je l'ai noté dans les commentaires, ne démarrez pas les Threads des constructeurs -- cela peut être dangereux.
mis à part cela, la deuxième mise en œuvre semble meilleure. Vous ne voulez pas mettre des files d'attente dans les champs statiques. Vous perdez probablement la flexibilité pour rien.
si vous voulez aller de l'avant avec votre propre mise en œuvre (à des fins d'apprentissage je suppose?), fournir au moins une méthode start()
. Vous devez construire l'objet (vous pouvez instancier l'objet Thread
), puis appeler start()
pour démarrer le thread.
Edit: ExecutorService
ont leur propre file d'attente de sorte que cela peut être déroutant.. Voici quelque chose pour vous aider à démarrer.
public class Main {
public static void main(String[] args) {
//The numbers are just silly tune parameters. Refer to the API.
//The important thing is, we are passing a bounded queue.
ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));
//No need to bound the queue for this executor.
//Use utility method instead of the complicated Constructor.
ExecutorService producer = Executors.newSingleThreadExecutor();
Runnable produce = new Produce(consumer);
producer.submit(produce);
}
}
class Produce implements Runnable {
private final ExecutorService consumer;
public Produce(ExecutorService consumer) {
this.consumer = consumer;
}
@Override
public void run() {
Pancake cake = Pan.cook();
Runnable consume = new Consume(cake);
consumer.submit(consume);
}
}
class Consume implements Runnable {
private final Pancake cake;
public Consume(Pancake cake){
this.cake = cake;
}
@Override
public void run() {
cake.eat();
}
}
modifier:
Pour le producteur, au lieu de while(true)
, vous pouvez faire quelque chose comme:
@Override
public void run(){
while(!Thread.currentThread().isInterrupted()){
//do stuff
}
}
de Cette façon, vous pouvez arrêter l'exécuteur en appelant .shutdownNow()
. Si vous utilisez while(true)
, il ne s'arrêtera pas.
notez également que le Producer
est encore vulnérable à RuntimeExceptions
(i.e. un RuntimeException
va arrêter le traitement)
vous réinventez la roue.
si vous avez besoin de persistance et d'autres caractéristiques de l'entreprise utiliser JMS (je suggérerais ActiveMq ).
si vous avez besoin de files d'attente rapides en mémoire, utilisez l'une des imitations de la file d'attente de java .
si vous avez besoin de supporter java 1.4 ou plus tôt, utilisez L'excellent de Doug Lea Paquet .
j'ai étendu la réponse proposée par cletus à l'exemple de code de travail.
- One
ExecutorService
(pes) accepte les tâchesProducer
. - Un
ExecutorService
(ces) accepte "151950920 des" tâches". - à la fois
Producer
etConsumer
actionsBlockingQueue
. - tâches multiples
Producer
génère des nombres différents. - N'importe laquelle des tâches
Consumer
peut consommer nombre généré parProducer
Code:
import java.util.concurrent.*;
public class ProducerConsumerWithES {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue,1));
pes.submit(new Producer(sharedQueue,2));
ces.submit(new Consumer(sharedQueue,1));
ces.submit(new Consumer(sharedQueue,2));
// shutdown should happen somewhere along with awaitTermination
/ * /q/how-to-properly-shutdown-java-executorservice-60087/"Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
sortie:
Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1
Note. Si vous n'avez pas besoin de plusieurs producteurs et consommateurs, conservez un seul producteur et un seul consommateur. J'ai ajouté de nombreux producteurs et consommateurs pour mettre en valeur les capacités de blocage de queue parmi de nombreux producteurs et consommateurs.
c'est un code très simple.
import java.util.*;
// @author : rootTraveller, June 2017
class ProducerConsumer {
public static void main(String[] args) throws Exception {
Queue<Integer> queue = new LinkedList<>();
Integer buffer = new Integer(10); //Important buffer or queue size, change as per need.
Producer producerThread = new Producer(queue, buffer, "PRODUCER");
Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");
producerThread.start();
consumerThread.start();
}
}
class Producer extends Thread {
private Queue<Integer> queue;
private int queueSize ;
public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
super(ThreadName);
this.queue = queueIn;
this.queueSize = queueSizeIn;
}
public void run() {
while(true){
synchronized (queue) {
while(queue.size() == queueSize){
System.out.println(Thread.currentThread().getName() + " FULL : waiting...\n");
try{
queue.wait(); //Important
} catch (Exception ex) {
ex.printStackTrace();
}
}
//queue empty then produce one, add and notify
int randomInt = new Random().nextInt();
System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt);
queue.add(randomInt);
queue.notifyAll(); //Important
} //synchronized ends here : NOTE
}
}
}
class Consumer extends Thread {
private Queue<Integer> queue;
private int queueSize;
public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
super (ThreadName);
this.queue = queueIn;
this.queueSize = queueSizeIn;
}
public void run() {
while(true){
synchronized (queue) {
while(queue.isEmpty()){
System.out.println(Thread.currentThread().getName() + " Empty : waiting...\n");
try {
queue.wait(); //Important
} catch (Exception ex) {
ex.printStackTrace();
}
}
//queue not empty then consume one and notify
System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
queue.notifyAll();
} //synchronized ends here : NOTE
}
}
}
- code Java "BlockingQueue" qui a synchronisé la méthode put and get.
- code Java "Producteur" , producteur de fil de produire des données.
- Java code "Consumer", fil de consommation pour consommer les données produites.
- code Java "ProducerConsumer_Main", fonction principale pour démarrer le fil producteur et consommateur.
BlockingQueue.java
public class BlockingQueue
{
int item;
boolean available = false;
public synchronized void put(int value)
{
while (available == true)
{
try
{
wait();
} catch (InterruptedException e) {
}
}
item = value;
available = true;
notifyAll();
}
public synchronized int get()
{
while(available == false)
{
try
{
wait();
}
catch(InterruptedException e){
}
}
available = false;
notifyAll();
return item;
}
}
consommateur.java
package com.sukanya.producer_Consumer;
public class Consumer extends Thread
{
blockingQueue queue;
private int number;
Consumer(BlockingQueue queue,int number)
{
this.queue = queue;
this.number = number;
}
public void run()
{
int value = 0;
for (int i = 0; i < 10; i++)
{
value = queue.get();
System.out.println("Consumer #" + this.number+ " got: " + value);
}
}
}
ProducerConsumer_Main.java
package com.sukanya.producer_Consumer;
public class ProducerConsumer_Main
{
public static void main(String args[])
{
BlockingQueue queue = new BlockingQueue();
Producer producer1 = new Producer(queue,1);
Consumer consumer1 = new Consumer(queue,1);
producer1.start();
consumer1.start();
}
}