Gestion des exceptions aux tâches Java ExecutorService

j'essaie d'utiliser la classe ThreadPoolExecutor de Java pour exécuter un grand nombre de tâches lourdes avec un nombre fixe de threads. Chacune des tâches comporte de nombreux endroits où elle peut échouer en raison d'exceptions.

j'AI sous-classé ThreadPoolExecutor et j'ai dépassé la méthode afterExecute qui est censée fournir toutes les exceptions non récupérées rencontrées tout en exécutant une tâche. Cependant, je n'arrive pas à le faire fonctionner.

par exemple:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

La sortie de ce programme est "Tout va bien--situation normale!"même si le seul Runnable soumis au pool de threads fait exception. La moindre idée de ce qui se passe ici?

Merci!

180
demandé sur Andrii Abramov 2010-02-12 01:09:49

11 réponses

De la docs :

Note: Lorsque les actions sont tâches (comme FutureTask) soit explicitement ou par des méthodes telles que soumettre, ces objets de capture et maintenir les exceptions en matière de calcul; donc ils ne causent pas brusque résiliation, et l'interne les exceptions ne sont pas passées à ceci méthode.

quand vous soumettez un Runnable, il sera enveloppé dans un futur.

votre afterexécute devrait être quelque chose comme ceci:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
130
répondu nos 2018-08-16 05:37:20

WARNING : il est à noter que cette solution bloquera le thread appelant.


si vous voulez traiter des exceptions lancées par la tâche, alors il est généralement préférable d'utiliser Callable plutôt que Runnable .

Callable.call() est autorisé à lancer des exceptions vérifiées, et celles-ci se propagent de nouveau au fil appelant:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

si Callable.call() jette une exception, ce sera enveloppé dans un ExecutionException et jeté par Future.get() .

Ceci est probablement beaucoup préférable au sous-classement ThreadPoolExecutor . Il vous donne également la possibilité de re-présenter la tâche si l'exception est récupérable.

210
répondu skaffman 2016-08-17 07:47:43

l'explication de ce comportement est droite dans le javadoc pour postexécute :

Note: Lorsque les actions sont tâches (comme FutureTask) soit explicitement ou par des méthodes telles que soumettre, ces objets de capture et maintenir les exceptions en matière de calcul; donc ils ne causent pas brusque résiliation, et l'interne les exceptions ne sont pas passées à ceci méthode.

16
répondu Drew Wills 2016-06-28 16:11:21

Je l'ai contourné en enveloppant le runnable fourni soumis à l'exécuteur.

CompletableFuture.runAsync(

        () -> {
                try {
                        runnable.run();
                } catch (Throwable e) {
                        Log.info(Concurrency.class, "runAsync", e);
                }
        },

        executorService
);
9
répondu momo 2014-12-14 11:14:20

j'utilise VerboseRunnable classe de jcabi-log , qui avale toutes les exceptions et les logs. Très pratique, par exemple:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
6
répondu yegor256 2014-05-02 11:14:47

une autre solution serait d'utiliser la ManagedTask et ManagedTaskListener .

Vous avez besoin d'un Callable ou Praticable qui implémente l'interface ManagedTask .

La méthode getManagedTaskListener renvoie l'instance que vous souhaitez.

public ManagedTaskListener getManagedTaskListener() {

et vous mettre en œuvre dans ManagedTaskListener le taskDone la méthode:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Plus de détails à propos de gérées tâche du cycle de vie et l'auditeur .

3
répondu CSchulz 2015-11-11 08:17:55

si vous voulez surveiller l'exécution d'une tâche, vous pouvez lancer 1 ou 2 threads (peut-être plus selon la charge) et les utiliser pour prendre des tâches d'un wrapper ExecutionCompletionService.

1
répondu Cristian Botiza 2016-07-31 13:23:17

si votre ExecutorService provient d'une source externe (I. E. il n'est pas possible d'utiliser la sous-classe ThreadPoolExecutor et de remplacer afterExecute() ), vous pouvez utiliser un proxy dynamique pour obtenir le comportement désiré:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
0
répondu Bass 2015-10-20 14:51:16

c'est à cause de AbstractExecutorService :: submit c'est envelopper votre runnable dans RunnableFuture (rien mais FutureTask ) comme ci-dessous

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

puis execute passera à Worker et Worker.run() appellera ci-dessous.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

enfin task.run(); dans l'appel de code ci-dessus appellera FutureTask.run() . Voici l'exception handler code, à cause de ce que vous ne se font PAS attendre exception.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
0
répondu Kanagavelu Sugumar 2016-05-12 08:32:03

ça fonctionne

  • il est dérivé de SingleThreadExecutor, mais vous pouvez l'adapter facilement
  • Java 8 Code lamdas, mais facile à corriger

il va créer un exécuteur avec un seul thread, qui peut obtenir beaucoup de tâches; et va attendre que l'exécution en cours de fin d'exécution pour commencer par le prochain

en cas d'erreur ou d'exception la uncaughtExceptionHandler va l'attraper

public final class SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable)  -> {
            final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
            });
            return newThread;
        };
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue(),
                        factory){


                    protected void afterExecute(Runnable runnable, Throwable throwable) {
                        super.afterExecute(runnable, throwable);
                        if (throwable == null && runnable instanceof Future) {
                            try {
                                Future future = (Future) runnable;
                                if (future.isDone()) {
                                    future.get();
                                }
                            } catch (CancellationException ce) {
                                throwable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause();
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt(); // ignore/reset
                            }
                        }
                        if (throwable != null) {
                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
                        }
                    }
                });
    }



    private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

    /**
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     */
    private static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future submit(Runnable task) {
            return e.submit(task);
        }
        public  Future submit(Callable task) {
            return e.submit(task);
        }
        public  Future submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public  List> invokeAll(Collection> tasks)
                throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public  List> invokeAll(Collection> tasks,
                                             long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public  T invokeAny(Collection> tasks)
                throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public  T invokeAny(Collection> tasks,
                               long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }



    private SingleThreadExecutorWithExceptions() {}
}
0
répondu obesga_tirant 2017-05-16 16:41:36

au lieu de subdiviser ThreadPoolExecutor, je lui fournirais une instance ThreadFactory qui crée de nouveaux Threads et leur fournit un UncaughtExceptionHandler

-5
répondu Kevin 2010-02-11 22:19:17