Gestion des exceptions aux tâches Java ExecutorService
j'essaie d'utiliser la classe ThreadPoolExecutor
de Java pour exécuter un grand nombre de tâches lourdes avec un nombre fixe de threads. Chacune des tâches comporte de nombreux endroits où elle peut échouer en raison d'exceptions.
j'AI sous-classé ThreadPoolExecutor
et j'ai dépassé la méthode afterExecute
qui est censée fournir toutes les exceptions non récupérées rencontrées tout en exécutant une tâche. Cependant, je n'arrive pas à le faire fonctionner.
par exemple:
public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything's fine--situation normal!");
}
}
public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}
La sortie de ce programme est "Tout va bien--situation normale!"même si le seul Runnable soumis au pool de threads fait exception. La moindre idée de ce qui se passe ici?
Merci!
11 réponses
De la docs :
Note: Lorsque les actions sont tâches (comme FutureTask) soit explicitement ou par des méthodes telles que soumettre, ces objets de capture et maintenir les exceptions en matière de calcul; donc ils ne causent pas brusque résiliation, et l'interne les exceptions ne sont pas passées à ceci méthode.
quand vous soumettez un Runnable, il sera enveloppé dans un futur.
votre afterexécute devrait être quelque chose comme ceci:
public final class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
System.out.println(t);
}
}
}
WARNING : il est à noter que cette solution bloquera le thread appelant.
si vous voulez traiter des exceptions lancées par la tâche, alors il est généralement préférable d'utiliser Callable
plutôt que Runnable
.
Callable.call()
est autorisé à lancer des exceptions vérifiées, et celles-ci se propagent de nouveau au fil appelant:
Callable task = ...
Future future = executor.submit(task);
try {
future.get();
} catch (ExecutionException ex) {
ex.getCause().printStackTrace();
}
si Callable.call()
jette une exception, ce sera enveloppé dans un ExecutionException
et jeté par Future.get()
.
Ceci est probablement beaucoup préférable au sous-classement ThreadPoolExecutor
. Il vous donne également la possibilité de re-présenter la tâche si l'exception est récupérable.
l'explication de ce comportement est droite dans le javadoc pour postexécute :
Note: Lorsque les actions sont tâches (comme FutureTask) soit explicitement ou par des méthodes telles que soumettre, ces objets de capture et maintenir les exceptions en matière de calcul; donc ils ne causent pas brusque résiliation, et l'interne les exceptions ne sont pas passées à ceci méthode.
Je l'ai contourné en enveloppant le runnable fourni soumis à l'exécuteur.
CompletableFuture.runAsync(
() -> {
try {
runnable.run();
} catch (Throwable e) {
Log.info(Concurrency.class, "runAsync", e);
}
},
executorService
);
j'utilise VerboseRunnable
classe de jcabi-log , qui avale toutes les exceptions et les logs. Très pratique, par exemple:
import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
new VerboseRunnable(
Runnable() {
public void run() {
// the code, which may throw
}
},
true // it means that all exceptions will be swallowed and logged
),
1, 1, TimeUnit.MILLISECONDS
);
une autre solution serait d'utiliser la ManagedTask et ManagedTaskListener .
Vous avez besoin d'un Callable ou Praticable qui implémente l'interface ManagedTask .
La méthode getManagedTaskListener
renvoie l'instance que vous souhaitez.
public ManagedTaskListener getManagedTaskListener() {
et vous mettre en œuvre dans ManagedTaskListener le taskDone
la méthode:
@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
if (exception != null) {
LOGGER.log(Level.SEVERE, exception.getMessage());
}
}
Plus de détails à propos de gérées tâche du cycle de vie et l'auditeur .
si vous voulez surveiller l'exécution d'une tâche, vous pouvez lancer 1 ou 2 threads (peut-être plus selon la charge) et les utiliser pour prendre des tâches d'un wrapper ExecutionCompletionService.
si votre ExecutorService
provient d'une source externe (I. E. il n'est pas possible d'utiliser la sous-classe ThreadPoolExecutor
et de remplacer afterExecute()
), vous pouvez utiliser un proxy dynamique pour obtenir le comportement désiré:
public static ExecutorService errorAware(final ExecutorService executor) {
return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] {ExecutorService.class},
(proxy, method, args) -> {
if (method.getName().equals("submit")) {
final Object arg0 = args[0];
if (arg0 instanceof Runnable) {
args[0] = new Runnable() {
@Override
public void run() {
final Runnable task = (Runnable) arg0;
try {
task.run();
if (task instanceof Future<?>) {
final Future<?> future = (Future<?>) task;
if (future.isDone()) {
try {
future.get();
} catch (final CancellationException ce) {
// Your error-handling code here
ce.printStackTrace();
} catch (final ExecutionException ee) {
// Your error-handling code here
ee.getCause().printStackTrace();
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
} catch (final RuntimeException re) {
// Your error-handling code here
re.printStackTrace();
throw re;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
} else if (arg0 instanceof Callable<?>) {
args[0] = new Callable<Object>() {
@Override
public Object call() throws Exception {
final Callable<?> task = (Callable<?>) arg0;
try {
return task.call();
} catch (final Exception e) {
// Your error-handling code here
e.printStackTrace();
throw e;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
}
}
return method.invoke(executor, args);
});
}
c'est à cause de AbstractExecutorService :: submit
c'est envelopper votre runnable
dans RunnableFuture
(rien mais FutureTask
) comme ci-dessous
AbstractExecutorService.java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
execute(ftask);
return ftask;
}
puis execute
passera à Worker
et Worker.run()
appellera ci-dessous.
ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); /////////HERE////////
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
enfin
task.run();
dans l'appel de code ci-dessus appelleraFutureTask.run()
. Voici l'exception handler code, à cause de ce que vous ne se font PAS attendre exception.
class FutureTask<V> implements RunnableFuture<V>
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) { /////////HERE////////
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
ça fonctionne
- il est dérivé de SingleThreadExecutor, mais vous pouvez l'adapter facilement
- Java 8 Code lamdas, mais facile à corriger
il va créer un exécuteur avec un seul thread, qui peut obtenir beaucoup de tâches; et va attendre que l'exécution en cours de fin d'exécution pour commencer par le prochain
en cas d'erreur ou d'exception la uncaughtExceptionHandler va l'attraper
public final class SingleThreadExecutorWithExceptions { public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { ThreadFactory factory = (Runnable runnable) -> { final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException(caugthThread, throwable); }); return newThread; }; return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), factory){ protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) { try { Future future = (Future) runnable; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { throwable = ce; } catch (ExecutionException ee) { throwable = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (throwable != null) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); } } }); } private static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } } /** * A wrapper class that exposes only the ExecutorService methods * of an ExecutorService implementation. */ private static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List> invokeAll(Collection> tasks) throws InterruptedException { return e.invokeAll(tasks); } public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } private SingleThreadExecutorWithExceptions() {} }
au lieu de subdiviser ThreadPoolExecutor, je lui fournirais une instance ThreadFactory qui crée de nouveaux Threads et leur fournit un UncaughtExceptionHandler