Pourquoi plus de code Java n'utilise-t-il pas PipedInputStream / PipedOutputStream?

J'ai découvert cet idiome récemment, et je me demande s'il me manque quelque chose. Je n'ai jamais vu utilisé. Presque tout le code Java avec lequel j'ai travaillé dans la nature favorise les données dans une chaîne ou un tampon, plutôt que quelque chose comme cet exemple (en utilisant des API HttpClient et XML par exemple):

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

Ce code utilise une technique de style Unix-piping pour empêcher que plusieurs copies des données XML soient conservées en mémoire. Il utilise le flux de sortie HTTP Post et L'API DOM Load / Save pour sérialiser un Document XML en tant que contenu de la requête HTTP. Aussi loin que je peux dire qu'il minimise l'utilisation de la mémoire avec très peu de code supplémentaire (juste quelques lignes pour Runnable, PipedInputStream, et PipedOutputStream).

Alors, quel est le problème avec cet idiome? S'il n'y a rien de mal à cet idiome, pourquoi ne l'ai-je pas vu?

EDIT: pour clarifier, PipedInputStream et PipedOutputStream remplacent la copie standard tampon par tampon qui apparaît partout, et ils vous permettent également de traiter les données entrantes en même temps que l'écriture les données traitées. Ils n'utilisent pas de tuyaux OS.

46
demandé sur lmichelbacher 2009-01-27 19:35:43

8 réponses

À partir des Javadocs :

Typiquement, les données sont lues à partir d'un objet PipedInputStream par un thread et les données sont écrites dans le PipedOutputStream correspondant par un autre thread. Tenter d'utiliser les deux objets à partir d'un seul thread n'est pas recommandé, car cela peut bloquer le thread.

Cela peut expliquer en partie pourquoi il n'est pas plus couramment utilisé.

Je suppose qu'une autre raison est que de nombreux développeurs ne comprennent pas son but / bénéfice.

43
répondu matt b 2014-02-14 18:27:56

Dans votre exemple, vous créez deux threads pour faire le travail qui pourrait être fait par un. Et introduire des retards d'E / S dans le mélange.

Avez-vous un meilleur exemple? Ou Ai-je juste répondu à votre question.


Pour tirer certains des commentaires (au moins mon point de vue d'eux) dans la réponse principale:

  • la concurrence introduit la complexité dans une application. Au lieu de traiter avec un seul flux linéaire de données, vous devez maintenant être préoccupé par le séquençage des indépendants les flux de données. Dans certains cas, la complexité supplémentaire peut être justifiée, en particulier si vous pouvez tirer parti de plusieurs cœurs / Processeurs pour effectuer un travail gourmand en CPU.
  • Si vous êtes dans une situation où vous pouvez bénéficier d'opérations simultanées, il existe généralement un meilleur moyen de coordonner le flux de données entre les threads. Par exemple, passer des objets entre les threads à l'aide d'une file d'attente concurrente, plutôt que d'envelopper les flux canalisés dans les flux d'objets.
  • où un flux canalisé peut être une bonne solution est lorsque vous avez plusieurs threads effectuant un traitement de texte, un pipeline Unix (par exemple: grep / sort).

Dans l'exemple spécifique, le flux canalisé permet l'utilisation d'une classe D'implémentation RequestEntity existante fournie par HttpClient. Je crois qu'une meilleure solution consiste à créer une nouvelle classe d'implémentation, comme ci-dessous, car l'exemple est finalement une opération séquentielle qui ne peut pas bénéficier de la complexité et de la surcharge d'une implémentation concurrente. Pendant que je montre le RequestEntity en tant que classe anonyme, la réutilisabilité indiquerait qu'il devrait s'agir d'une classe de première classe.

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});
7
répondu kdgregory 2009-01-27 22:06:18

Moi aussi, j'ai seulement découvert les classes PipedInputStream/PipedOutputStream récemment.

Je développe un plug-in Eclipse qui doit exécuter des commandes sur un serveur distant via SSH. J'utilise JSch et L'API Channel lit à partir d'un flux d'entrée et écrit dans un flux de sortie. Mais j'ai besoin de nourrir les commandes à travers le flux d'entrée et de lire les réponses à partir d'un flux de sortie. C'est là que PipedInput / OutputStream entre en jeu.

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();
6
répondu Brian Matthews 2009-01-27 17:48:44

Aussi, revenons à l'exemple original: non, il ne minimise pas exactement l'utilisation de la mémoire non plus. Les arborescences DOM sont construites, la mise en mémoire tampon est effectuée - bien que ce soit mieux que les répliques de tableau d'octets complets, ce n'est pas beaucoup mieux. Mais la mise en mémoire tampon dans ce cas sera plus lente; et un thread supplémentaire est également créé-vous ne pouvez pas utiliser la paire PipedInput/OutputStream à partir d'un seul thread.

Parfois, PipedXxxStreams sont utiles, mais la raison pour laquelle ils ne sont pas utilisés plus est parce que très souvent ils ne sont pas la bonne solution. Ils sont ok pour la communication inter-thread, et c'est là que je les ai utilisés pour ce que cela vaut. C'est juste qu'il n'y a pas beaucoup de cas d'utilisation pour cela, étant donné que SOA pousse la plupart de ces limites à être entre les services, au lieu d'entre les threads.

4
répondu StaxMan 2009-01-27 20:50:59

J'ai essayé d'utiliser ces classes il y a un moment pour quelque chose, j'oublie les détails. Mais j'ai découvert que leur mise en œuvre est fatalement imparfaite. Je ne me souviens pas de ce que c'était mais j'ai une mémoire sournoise que c'était peut-être une condition de course qui signifiait qu'ils étaient parfois bloqués (et oui, bien sûr, je les utilisais dans des threads séparés: ils ne sont tout simplement pas utilisables dans un seul thread et n'étaient pas conçus pour l'être).

Je pourrais jeter un oeil à leur code source et voir si je peux voir ce que le problème pourrait avoir été.

2
répondu Adrian Pronk 2009-01-27 20:27:45

Voici un cas d'utilisation où les tuyaux ont du sens:

Supposons que vous ayez une lib tierce, telle qu'un mappeur xslt ou une lib crypto qui a une interface comme celle-ci: doSomething (inputStream, outputStream). Et vous ne voulez pas tamponner le résultat avant d'envoyer sur le fil. Apache et d'autres clients interdisent l'accès direct au fil outputstream. Le plus proche que vous pouvez obtenir est d'obtenir le outputstream - à un décalage, après l'écriture des en - têtes-dans un objet entité de requête. Mais puisque c'est en vertu de le capot, il ne suffit toujours pas de passer un inputstream et outputstream à la lib tierce partie. Les tuyaux sont une bonne solution à ce problème.

Incidemment, j'ai écrit une inversion de L'API du client HTTP D'Apache [PipedApacheClientOutputStream] qui fournit une interface OutputStream pour HTTP POST en utilisant le client HTTP Apache Commons 4.3.4. Ceci est un exemple où les flux canalisés pourraient avoir un sens.

2
répondu Robert Christian 2016-03-08 19:23:34

Java.io les tuyaux ont trop de commutation de contexte (par octet en lecture / écriture) et leur java.NIO counterpart vous oblige à avoir un fond NIO et une utilisation correcte des canaux et autres, c'est ma propre implémentation de pipes utilisant une file d'attente de blocage qui, pour un seul producteur/consommateur, fonctionnera rapidement et évoluera bien:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
1
répondu Guido Medina 2013-10-19 12:28:00

Alors, quel est le problème avec cet idiome? Si il n'y a rien de mal à cet idiome, pourquoi n'ai-je pas vu?

EDIT: pour clarifier, PipedInputStream et PipedOutputStream remplace le passe-partout de la mémoire tampon par le tampon de copie montre partout, et ils ont aussi vous permettent de traiter les données entrantes parallèlement à l'écriture du données traitées. Ils n'utilisent pas OS tuyau.

Vous avez dit ce qu'il fait mais vous n'avez pas dit Pourquoi vous faites ce.

Si vous croyez que cela réduira les ressources utilisées (cpu / mémoire) ou améliorera les performances, cela ne le fera pas non plus. Cependant, cela rendra votre code plus complexe.

Fondamentalement, vous avez une solution sans problème pour lequel elle résout.

0
répondu Peter Lawrey 2009-01-27 20:02:27