Servlet-3 Contexte asynchrone, comment faire des écritures asynchrones?
Description Du Problème
Servlet-3.0 API permet de désactiver un contexte request/response et d'y répondre plus tard.
Toutefois si j'essaie d'écrire une grande quantité de données, quelque chose comme:
AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()
il peut en fait bloquer - et il le fait dans les cas d'essai trivial - à la fois pour Tomcat 7 et Jetty 8. Les tutoriels recommandent de créer un pool de threads qui gérer une telle installation - sorcière est généralement le contre-positif à une architecture traditionnelle 10K.
cependant si j'ai 10.000 connexions ouvertes et un pool de thread de disons 10 threads, il est suffisant pour même 1% des clients qui ont des connexions à faible vitesse ou tout simplement bloqué connexion pour bloquer le pool de threads et complètement bloquer la réponse de la comète ou ralentir de manière significative.
la pratique attendue est d'obtenir une notification "prêt à écrire" ou une notification d'achèvement des e/s et de continuer à poussez les données.
Comment faire avec L'API Servlet-3.0, c'est-à-dire comment obtenir l'une ou l'autre:
- asynchrone Completion notification on I / O operation.
- Obtenir des non-blocage I/S en écriture prêt de notification.
si cela n'est pas pris en charge par L'API Servlet-3.0, y a-t-il des API spécifiques au serveur Web (comme Jetty Continuation ou Tomcat CometEvent) qui permettent de gérer de tels les événements vraiment de manière asynchrone sans truquer les e/S asynchrones à l'aide de pool de threads.
quelqu'un le sait?
et si cela n'est pas possible, Pouvez-vous le confirmer par une référence à la documentation?
démonstration du problème dans un code échantillon
j'avais joint le code ci-dessous qui émule événement-stream.
Notes:
- il utilise
ServletOutputStream
que lanceIOException
pour détecter les clients déconnectés
"1519230920 elle" envoie - j'ai créé un pool de threads pour "émuler" les opérations asynchrones.
keep-alive
des messages qui assurez-vous que les clients sont toujours là
dans un tel exemple, j'ai explicitement défini le pool de filetage de la taille 1 pour montrer le problème:
- lancer une application
- Exécuter à partir de deux bornes
curl http://localhost:8080/path/to/app
(deux fois) - Envoyer maintenant les données avec
curd -d m=message http://localhost:8080/path/to/app
- les deux clients ont reçu les données
- maintenant suspendre un des clients (Ctrl+Z) et envoyer le message Une fois de plus
curd -d m=message http://localhost:8080/path/to/app
- observez qu'un autre client non suspendu n'a rien reçu ou a cessé de recevoir les requêtes keep-alive après que le message a été transféré parce que l'autre fil est bloqué.
je veux résoudre un tel problème sans utiliser thread pool, car avec 1000-5000 ouvert connexions je peux épuiser le bassin de filetage très rapidement.
le code d'échantillon ci-dessous.
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;
@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {
private long id = 0;
private String message = "";
private final ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
// it is explicitly small for demonstration purpose
private final Thread timer = new Thread(new Runnable() {
public void run()
{
try {
while(true) {
Thread.sleep(1000);
sendKeepAlive();
}
}
catch(InterruptedException e) {
// exit
}
}
});
class RunJob implements Runnable {
volatile long lastUpdate = System.nanoTime();
long id = 0;
AsyncContext ac;
RunJob(AsyncContext ac)
{
this.ac = ac;
}
public void keepAlive()
{
if(System.nanoTime() - lastUpdate > 1000000000L)
pool.submit(this);
}
String formatMessage(String msg)
{
StringBuilder sb = new StringBuilder();
sb.append("id");
sb.append(id);
for(int i=0;i<100000;i++) {
sb.append("data:");
sb.append(msg);
sb.append("n");
}
sb.append("n");
return sb.toString();
}
public void run()
{
String message = null;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id) {
this.id = HugeStreamWithThreads.this.id;
message = HugeStreamWithThreads.this.message;
}
}
if(message == null)
message = ":keep-alivenn";
else
message = formatMessage(message);
if(!sendMessage(message))
return;
boolean once_again = false;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id)
once_again = true;
}
if(once_again)
pool.submit(this);
}
boolean sendMessage(String message)
{
try {
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(message);
out.flush();
lastUpdate = System.nanoTime();
return true;
}
catch(IOException e) {
ac.complete();
removeContext(this);
return false;
}
}
};
private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();
@Override
public void init(ServletConfig config) throws ServletException
{
super.init(config);
timer.start();
}
@Override
public void destroy()
{
for(;;){
try {
timer.interrupt();
timer.join();
break;
}
catch(InterruptedException e) {
continue;
}
}
pool.shutdown();
super.destroy();
}
protected synchronized void removeContext(RunJob ac)
{
asyncContexts.remove(ac);
}
// GET method is used to establish a stream connection
@Override
protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// Content-Type header
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
// Access-Control-Allow-Origin header
response.setHeader("Access-Control-Allow-Origin", "*");
final AsyncContext ac = request.startAsync();
ac.setTimeout(0);
RunJob job = new RunJob(ac);
asyncContexts.add(job);
if(id!=0) {
pool.submit(job);
}
}
private synchronized void sendKeepAlive()
{
for(RunJob job : asyncContexts) {
job.keepAlive();
}
}
// POST method is used to communicate with the server
@Override
protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
{
request.setCharacterEncoding("utf-8");
id++;
message = request.getParameter("m");
for(RunJob job : asyncContexts) {
pool.submit(job);
}
}
}
L'échantillon ci-dessus utilise des fils pour empêcher le blocage... Cependant si le nombre de clients bloquants est plus grand que la taille du pool de thread, il serait bloquer.
Comment pourrait-il être mis en œuvre sans blocage?
6 réponses
J'ai trouvé l'API Servlet 3.0
Asynchronous
difficile à implémenter correctement et la documentation utile doit être clairsemée. Après beaucoup d'essais et d'erreurs et d'essayer différentes approches, j'ai été en mesure de trouver une solution robuste que j'ai été très heureux avec. Quand je regarde mon code et que je le compare au vôtre, je remarque une différence majeure qui peut vous aider avec votre problème particulier. J'utilise un ServletResponse
pour écrire les données et non pas un ServletOutputStream
.
Voici mon classe de Servlet asynchrone go-to légèrement adaptée à votre cas some_big_data
:
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.log4j.Logger;
@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {
private static final Logger logger = Logger.getLogger(AsyncServlet.class);
public static final int CALLBACK_TIMEOUT = 10000; // ms
/** executor service */
private ExecutorService exec;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
int size = Integer.parseInt(getInitParameter("threadpoolsize"));
exec = Executors.newFixedThreadPool(size);
}
@Override
public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
final AsyncContext ctx = req.startAsync();
final HttpSession session = req.getSession();
// set the timeout
ctx.setTimeout(CALLBACK_TIMEOUT);
// attach listener to respond to lifecycle events of this AsyncContext
ctx.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
logger.info("onComplete called");
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
logger.info("onTimeout called");
}
@Override
public void onError(AsyncEvent event) throws IOException {
logger.info("onError called: " + event.toString());
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
logger.info("onStartAsync called");
}
});
enqueLongRunningTask(ctx, session);
}
/**
* if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
* <p/>
* if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
*/
private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {
exec.execute(new Runnable() {
@Override
public void run() {
String some_big_data = getSomeBigData();
try {
ServletResponse response = ctx.getResponse();
if (response != null) {
response.getWriter().write(some_big_data);
ctx.complete();
} else {
throw new IllegalStateException(); // this is caught below
}
} catch (IllegalStateException ex) {
logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
} catch (Exception e) {
logger.error("ERROR IN AsyncServlet", e);
}
}
});
}
/** destroy the executor */
@Override
public void destroy() {
exec.shutdown();
}
}
au cours de mes recherches sur ce sujet, ce fil a continué à apparaître, donc figuré je le mentionne ici:
Servlet 3.1 introduit les opérations async sur ServletInputStream
et ServletOutputStream
. Voir ServletOutputStream.setWriteListener
.
un exemple se trouve à http://docs.oracle.com/javaee/7/tutorial/servlets013.htm
cela pourrait être utile
http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/async-servlet/async-servlets.html
nous ne pouvons pas tout à fait causer l'écriture d'être asynchrone. Nous devons, de façon réaliste, composer avec la limite selon laquelle lorsque nous écrivons quelque chose à un client, nous nous attendons à pouvoir le faire rapidement et à pouvoir le traiter comme une erreur si nous ne le faisons pas. C'est-à-dire, si notre objectif est de diffuser les données au client aussi vite que possible et d'utiliser le statut de blocage/non-blocage du canal comme un moyen de contrôler le flux, nous n'avons pas de chance. Mais, si nous envoyons des données à un faible taux qu'un client devrait être en mesure de gérer, Nous sommes en mesure au moins de déconnecter rapidement les clients qui ne lisent pas assez rapidement.
par exemple, dans votre application, nous envoyons les keepalives à un rythme lent (toutes les quelques secondes) et nous nous attendons à ce que les clients puissent suivre tous les événements qu'ils reçoivent. Nous transmettons les données au client, et s'il ne peut pas suivre, nous pouvons le déconnecter rapidement et proprement. C'est un peu plus limité que le véritable E/S asynchrone, mais il devrait répondre à vos besoins (et d'ailleurs, le mien).
l'astuce est que toutes les méthodes pour écrire la sortie qui vient de jeter IOExceptions font en fait un peu plus que cela: dans l'implémentation, tous les appels à des choses qui peuvent être interrompues () ed sera enveloppé avec quelque chose comme ceci (pris de Jetty 9):
catch (InterruptedException x)
throw (IOException)new InterruptedIOException().initCause(x);
(je note aussi que ce ne se produit pas dans Jetty 8, où une exception interrompue est enregistrée et la boucle de blocage est immédiatement rejugé. Vraisemblablement, vous faites pour s'assurer que votre conteneur de servlet est bien comportée pour utiliser cette astuce.)
C'est-à-dire, quand un client lent bloque un thread d'écriture, on force simplement l'écriture à être lancée comme une IOException en appelant l'interruption() sur le thread. Pensez-y: le code non-bloquant consommerait une unité de temps sur l'un de nos threads de traitement à exécuter de toute façon, donc en utilisant des Écritures de blocage qui sont juste avortées (après disons une milliseconde) est vraiment identique en principe. Nous sommes encore en train de passer un peu de temps sur le fil, à peine moins efficacement.
j'ai modifié votre code de sorte que le fil de temps principal exécute une tâche pour lier l'heure dans chaque écriture juste avant que nous commencions l'écriture, et la tâche est annulée si l'écriture se termine rapidement, ce qui devrait.
une dernière remarque: dans un container servlet bien mis en œuvre, provoquant l'E/S de jeter devrait pour être sûr. Ce serait bien si nous pouvions attraper L'InterruptedIOException et réessayer l'écriture plus tard. Peut-être que nous aimerions donner lent clients un sous-ensemble des événements s'ils ne peuvent pas suivre avec le jet. Pour autant que je sache, à Jetty, ce n'est pas tout à fait sûr. Si une écriture est lancée, l'état interne de L'objet HttpResponse pourrait ne pas être suffisamment cohérent pour gérer la ré-entrée en écriture en toute sécurité plus tard. J'attends qu'il n'est pas sage d'essayer de pousser un conteneur de servlet de cette façon a moins qu'il n'y ait des docs spécifiques, Je n'ai pas offert cette garantie. Je pense que l'idée est qu'une connexion est conçue pour être arrêtée si une exception se produit.
voici le code, avec une version modifiée de RunJob::run() en utilisant une illustration grotty simple (en réalité, nous aimerions utiliser le fil de minuterie principal ici plutôt que de lancer un par-écriture qui est stupide).
public void run()
{
String message = null;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id) {
this.id = HugeStreamWithThreads.this.id;
message = HugeStreamWithThreads.this.message;
}
}
if(message == null)
message = ":keep-alive\n\n";
else
message = formatMessage(message);
final Thread curr = Thread.currentThread();
Thread canceller = new Thread(new Runnable() {
public void run()
{
try {
Thread.sleep(2000);
curr.interrupt();
}
catch(InterruptedException e) {
// exit
}
}
});
canceller.start();
try {
if(!sendMessage(message))
return;
} finally {
canceller.interrupt();
while (true) {
try { canceller.join(); break; }
catch (InterruptedException e) { }
}
}
boolean once_again = false;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id)
once_again = true;
}
if(once_again)
pool.submit(this);
}
est-ce que le printemps est une option pour vous? Spring-MVC 3.2 a une classe appelée DeferredResult
, qui gérera avec élégance votre scénario" 10,000 open connections/10 server pool threads".
exemple: http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support /
j'ai eu un rapide coup d'oeil à votre liste, donc j'ai peut-être manqué quelques points. L'avantage d'un thread pool est de partager des ressources thread entre plusieurs tâches au fil du temps. Vous pouvez peut-être résoudre votre problème en espaçant les réponses keepAlive des différentes connexions http, au lieu de les regrouper toutes en même temps.