Servlet-3 Contexte asynchrone, comment faire des écritures asynchrones?

Description Du Problème

Servlet-3.0 API permet de désactiver un contexte request/response et d'y répondre plus tard.

Toutefois si j'essaie d'écrire une grande quantité de données, quelque chose comme:

AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()

il peut en fait bloquer - et il le fait dans les cas d'essai trivial - à la fois pour Tomcat 7 et Jetty 8. Les tutoriels recommandent de créer un pool de threads qui gérer une telle installation - sorcière est généralement le contre-positif à une architecture traditionnelle 10K.

cependant si j'ai 10.000 connexions ouvertes et un pool de thread de disons 10 threads, il est suffisant pour même 1% des clients qui ont des connexions à faible vitesse ou tout simplement bloqué connexion pour bloquer le pool de threads et complètement bloquer la réponse de la comète ou ralentir de manière significative.

la pratique attendue est d'obtenir une notification "prêt à écrire" ou une notification d'achèvement des e/s et de continuer à poussez les données.

Comment faire avec L'API Servlet-3.0, c'est-à-dire comment obtenir l'une ou l'autre:

  • asynchrone Completion notification on I / O operation.
  • Obtenir des non-blocage I/S en écriture prêt de notification.

si cela n'est pas pris en charge par L'API Servlet-3.0, y a-t-il des API spécifiques au serveur Web (comme Jetty Continuation ou Tomcat CometEvent) qui permettent de gérer de tels les événements vraiment de manière asynchrone sans truquer les e/S asynchrones à l'aide de pool de threads.

quelqu'un le sait?

et si cela n'est pas possible, Pouvez-vous le confirmer par une référence à la documentation?

démonstration du problème dans un code échantillon

j'avais joint le code ci-dessous qui émule événement-stream.

Notes:

  • il utilise ServletOutputStream que lance IOException pour détecter les clients déconnectés
  • "1519230920 elle" envoie keep-alive des messages qui assurez-vous que les clients sont toujours là
  • j'ai créé un pool de threads pour "émuler" les opérations asynchrones.

dans un tel exemple, j'ai explicitement défini le pool de filetage de la taille 1 pour montrer le problème:

  • lancer une application
  • Exécuter à partir de deux bornes curl http://localhost:8080/path/to/app (deux fois)
  • Envoyer maintenant les données avec curd -d m=message http://localhost:8080/path/to/app
  • les deux clients ont reçu les données
  • maintenant suspendre un des clients (Ctrl+Z) et envoyer le message Une fois de plus curd -d m=message http://localhost:8080/path/to/app
  • observez qu'un autre client non suspendu n'a rien reçu ou a cessé de recevoir les requêtes keep-alive après que le message a été transféré parce que l'autre fil est bloqué.

je veux résoudre un tel problème sans utiliser thread pool, car avec 1000-5000 ouvert connexions je peux épuiser le bassin de filetage très rapidement.

le code d'échantillon ci-dessous.


import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;


@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {

    private long id = 0;
    private String message = "";
    private final ThreadPoolExecutor pool = 
        new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        // it is explicitly small for demonstration purpose

    private final Thread timer = new Thread(new Runnable() {
        public void run()
        {
            try {
                while(true) {
                    Thread.sleep(1000);
                    sendKeepAlive();
                }
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });


    class RunJob implements Runnable {
        volatile long lastUpdate = System.nanoTime();
        long id = 0;
        AsyncContext ac;
        RunJob(AsyncContext ac) 
        {
            this.ac = ac;
        }
        public void keepAlive()
        {
            if(System.nanoTime() - lastUpdate > 1000000000L)
                pool.submit(this);
        }
        String formatMessage(String msg)
        {
            StringBuilder sb = new StringBuilder();
            sb.append("id");
            sb.append(id);
            for(int i=0;i<100000;i++) {
                sb.append("data:");
                sb.append(msg);
                sb.append("n");
            }
            sb.append("n");
            return sb.toString();
        }
        public void run()
        {
            String message = null;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id) {
                    this.id = HugeStreamWithThreads.this.id;
                    message = HugeStreamWithThreads.this.message;
                }
            }
            if(message == null)
                message = ":keep-alivenn";
            else
                message = formatMessage(message);

            if(!sendMessage(message))
                return;

            boolean once_again = false;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id)
                    once_again = true;
            }
            if(once_again)
                pool.submit(this);

        }
        boolean sendMessage(String message) 
        {
            try {
                ServletOutputStream out = ac.getResponse().getOutputStream();
                out.print(message);
                out.flush();
                lastUpdate = System.nanoTime();
                return true;
            }
            catch(IOException e) {
                ac.complete();
                removeContext(this);
                return false;
            }
        }
    };

    private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();

    @Override
    public void init(ServletConfig config) throws ServletException
    {
        super.init(config);
        timer.start();
    }
    @Override
    public void destroy()
    {
        for(;;){
            try {
                timer.interrupt();
                timer.join();
                break;
            }
            catch(InterruptedException e) {
                continue;
            }
        }
        pool.shutdown();
        super.destroy();
    }


    protected synchronized void removeContext(RunJob ac)
    {
        asyncContexts.remove(ac);
    }

    // GET method is used to establish a stream connection
    @Override
    protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        // Content-Type header
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        // Access-Control-Allow-Origin header
        response.setHeader("Access-Control-Allow-Origin", "*");

        final AsyncContext ac = request.startAsync();

        ac.setTimeout(0);
        RunJob job = new RunJob(ac);
        asyncContexts.add(job);
        if(id!=0) {
            pool.submit(job);
        }
    }

    private synchronized void sendKeepAlive()
    {
        for(RunJob job : asyncContexts) {
            job.keepAlive();
        }
    }

    // POST method is used to communicate with the server
    @Override
    protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException 
    {
        request.setCharacterEncoding("utf-8");
        id++;
        message = request.getParameter("m");        
        for(RunJob job : asyncContexts) {
            pool.submit(job);
        }
    }


}

L'échantillon ci-dessus utilise des fils pour empêcher le blocage... Cependant si le nombre de clients bloquants est plus grand que la taille du pool de thread, il serait bloquer.

Comment pourrait-il être mis en œuvre sans blocage?

50
demandé sur Artyom 2012-08-23 09:27:46

6 réponses

J'ai trouvé l'API Servlet 3.0 Asynchronous difficile à implémenter correctement et la documentation utile doit être clairsemée. Après beaucoup d'essais et d'erreurs et d'essayer différentes approches, j'ai été en mesure de trouver une solution robuste que j'ai été très heureux avec. Quand je regarde mon code et que je le compare au vôtre, je remarque une différence majeure qui peut vous aider avec votre problème particulier. J'utilise un ServletResponse pour écrire les données et non pas un ServletOutputStream .

Voici mon classe de Servlet asynchrone go-to légèrement adaptée à votre cas some_big_data :

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {

        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}
29
répondu herrtim 2012-08-30 18:42:50

au cours de mes recherches sur ce sujet, ce fil a continué à apparaître, donc figuré je le mentionne ici:

Servlet 3.1 introduit les opérations async sur ServletInputStream et ServletOutputStream . Voir ServletOutputStream.setWriteListener .

un exemple se trouve à http://docs.oracle.com/javaee/7/tutorial/servlets013.htm

9
répondu Erich Eichinger 2015-07-01 09:00:04

nous ne pouvons pas tout à fait causer l'écriture d'être asynchrone. Nous devons, de façon réaliste, composer avec la limite selon laquelle lorsque nous écrivons quelque chose à un client, nous nous attendons à pouvoir le faire rapidement et à pouvoir le traiter comme une erreur si nous ne le faisons pas. C'est-à-dire, si notre objectif est de diffuser les données au client aussi vite que possible et d'utiliser le statut de blocage/non-blocage du canal comme un moyen de contrôler le flux, nous n'avons pas de chance. Mais, si nous envoyons des données à un faible taux qu'un client devrait être en mesure de gérer, Nous sommes en mesure au moins de déconnecter rapidement les clients qui ne lisent pas assez rapidement.

par exemple, dans votre application, nous envoyons les keepalives à un rythme lent (toutes les quelques secondes) et nous nous attendons à ce que les clients puissent suivre tous les événements qu'ils reçoivent. Nous transmettons les données au client, et s'il ne peut pas suivre, nous pouvons le déconnecter rapidement et proprement. C'est un peu plus limité que le véritable E/S asynchrone, mais il devrait répondre à vos besoins (et d'ailleurs, le mien).

l'astuce est que toutes les méthodes pour écrire la sortie qui vient de jeter IOExceptions font en fait un peu plus que cela: dans l'implémentation, tous les appels à des choses qui peuvent être interrompues () ed sera enveloppé avec quelque chose comme ceci (pris de Jetty 9):

catch (InterruptedException x)
    throw (IOException)new InterruptedIOException().initCause(x);

(je note aussi que ce ne se produit pas dans Jetty 8, où une exception interrompue est enregistrée et la boucle de blocage est immédiatement rejugé. Vraisemblablement, vous faites pour s'assurer que votre conteneur de servlet est bien comportée pour utiliser cette astuce.)

C'est-à-dire, quand un client lent bloque un thread d'écriture, on force simplement l'écriture à être lancée comme une IOException en appelant l'interruption() sur le thread. Pensez-y: le code non-bloquant consommerait une unité de temps sur l'un de nos threads de traitement à exécuter de toute façon, donc en utilisant des Écritures de blocage qui sont juste avortées (après disons une milliseconde) est vraiment identique en principe. Nous sommes encore en train de passer un peu de temps sur le fil, à peine moins efficacement.

j'ai modifié votre code de sorte que le fil de temps principal exécute une tâche pour lier l'heure dans chaque écriture juste avant que nous commencions l'écriture, et la tâche est annulée si l'écriture se termine rapidement, ce qui devrait.

une dernière remarque: dans un container servlet bien mis en œuvre, provoquant l'E/S de jeter devrait pour être sûr. Ce serait bien si nous pouvions attraper L'InterruptedIOException et réessayer l'écriture plus tard. Peut-être que nous aimerions donner lent clients un sous-ensemble des événements s'ils ne peuvent pas suivre avec le jet. Pour autant que je sache, à Jetty, ce n'est pas tout à fait sûr. Si une écriture est lancée, l'état interne de L'objet HttpResponse pourrait ne pas être suffisamment cohérent pour gérer la ré-entrée en écriture en toute sécurité plus tard. J'attends qu'il n'est pas sage d'essayer de pousser un conteneur de servlet de cette façon a moins qu'il n'y ait des docs spécifiques, Je n'ai pas offert cette garantie. Je pense que l'idée est qu'une connexion est conçue pour être arrêtée si une exception se produit.

voici le code, avec une version modifiée de RunJob::run() en utilisant une illustration grotty simple (en réalité, nous aimerions utiliser le fil de minuterie principal ici plutôt que de lancer un par-écriture qui est stupide).

public void run()
{
    String message = null;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id) {
            this.id = HugeStreamWithThreads.this.id;
            message = HugeStreamWithThreads.this.message;
        }
    }
    if(message == null)
        message = ":keep-alive\n\n";
    else
        message = formatMessage(message);

    final Thread curr = Thread.currentThread();
    Thread canceller = new Thread(new Runnable() {
        public void run()
        {
            try {
                Thread.sleep(2000);
                curr.interrupt();
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });
    canceller.start();

    try {
        if(!sendMessage(message))
            return;
    } finally {
        canceller.interrupt();
        while (true) {
            try { canceller.join(); break; }
            catch (InterruptedException e) { }
        }
    }

    boolean once_again = false;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id)
            once_again = true;
    }
    if(once_again)
        pool.submit(this);

}
3
répondu Nicholas Wilson 2013-04-30 23:49:32

est-ce que le printemps est une option pour vous? Spring-MVC 3.2 a une classe appelée DeferredResult , qui gérera avec élégance votre scénario" 10,000 open connections/10 server pool threads".

exemple: http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support /

2
répondu JJ Zabkar 2012-08-30 16:32:10

j'ai eu un rapide coup d'oeil à votre liste, donc j'ai peut-être manqué quelques points. L'avantage d'un thread pool est de partager des ressources thread entre plusieurs tâches au fil du temps. Vous pouvez peut-être résoudre votre problème en espaçant les réponses keepAlive des différentes connexions http, au lieu de les regrouper toutes en même temps.

-1
répondu user2121502 2013-02-28 22:23:45