Scala en attente d'une séquence de futurs

j'espérais que le code comme suit attendrait les deux futurs, mais ce n'est pas le cas.

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

j'ai supposé que seq.onComplete attendrait qu'ils tous se complètent avant de se compléter, mais pas ainsi; il en résulte:

true
false

.sequence était un peu difficile à suivre dans la source de scala.simultané.Futur, je me demande comment je mettrais en œuvre un parallèle qui attend tous les futurs originaux d'une séquence (de taille dynamique), ou ce qui pourrait le problème ici.

Edit: Une question connexe: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)

21
demandé sur Community 2015-03-30 13:59:17

4 réponses

une approche commune pour attendre tous les résultats (ratés ou non) est de" lever " les échecs dans une nouvelle représentation à l'intérieur du futur, de sorte que tous les futurs complets avec un certain résultat (bien qu'ils puissent compléter avec un résultat qui représente l'échec). Une façon naturelle d'obtenir cela est le levage à un Try .

la mise en œuvre de Twitter de futures fournit une méthode liftToTry qui rend ce trivial, mais vous pouvez faire quelque chose de similaire avec la mise en œuvre de la bibliothèque standard:

import scala.util.{ Failure, Success, Try }

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
  _.map(Success(_)).recover { case t => Failure(t) }
)

Maintenant Future.sequence(lifted) sera terminée lorsque tous les futurs est terminé, et représentera les succès et les échecs à l'aide de Try .

et donc, une solution générique pour attendre sur tous les futurs originaux d'une séquence de futurs peut ressembler à ce qui suit, en supposant qu'un contexte d'exécution est naturellement implicitement disponible.

  import scala.util.{ Failure, Success, Try }

  private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]]) =
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used

  waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures
  }
26
répondu Travis Brown 2015-03-30 20:47:00

a Future produit par Future.sequence remplit l'une des conditions suivantes:

  • tous les contrats à terme ont été conclus avec succès, ou
  • l'un des contrats à terme a échoué

le deuxième point est ce qui se passe dans votre cas, et il est logique de compléter dès que l'un des Future enveloppé a échoué, parce que l'emballage Future ne peut tenir un seul Throwable dans le cas de la défaillance. Il n'y a pas de raison d'attendre les autres futurs parce que le résultat sera le même échec.

14
répondu Ionuț G. Stan 2015-03-30 11:13:48

il s'agit d'un exemple qui appuie la réponse précédente. Il y a un moyen facile de le faire en utilisant seulement L'APIs Scala standard.

dans l'exemple, je crée 3 futures. Ils se termineront respectivement à 5, 7 et 9 secondes. L'appel à Await.result sera bloqué jusqu'à ce que tous les contrats à terme aient résolu. Une fois que les 3 contrats à terme seront terminés, a sera défini à List(5,7,9) et l'exécution se poursuivra.

en outre, si une exception est jeté dans l'un des futurs, Await.result va immédiatement débloquer et jeter l'exception. Décommentez la ligne Exception(...) pour voir cela en action.

  try {
    val a = Await.result(Future.sequence(Seq(
      Future({
        blocking {
          Thread.sleep(5000)
        }
        System.err.println("A")
        5
      }),
      Future({
        blocking {
          Thread.sleep(7000)
        }
        System.err.println("B")
        7
        //throw new Exception("Ha!")
      }),
      Future({
        blocking {
          Thread.sleep(9000)
        }
        System.err.println("C")
        9
      }))),
      Duration("100 sec"))

    System.err.println(a)
  } catch {
    case e: Exception ⇒
      e.printStackTrace()
  }
2
répondu Jason Smith 2016-07-31 18:05:41

nous pouvons enrichir Seq[Future[T]] avec sa propre méthode onComplete à travers une classe implicite:

  def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
    f map { Success(_) } recover { case e => Failure(e) }

  def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
    fs map { lift(_) }

  implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
    def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
      Future.sequence(lift(fs)) onComplete {
        case Success(s) => f(s)
        case Failure(e) => throw e // will never happen, because of the Try lifting
      }
    }
  }

alors, dans votre MWE particulier, vous pouvez faire:

  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2)

  lf onComplete { _ map {
    case Success(v) => ???
    case Failure(e) => ???
  }}

Cette solution a l'avantage de vous permettre d'appeler un onComplete sur une séquence de futurs comme vous le feriez sur un seul futur.

1
répondu Bruno 2017-10-22 02:53:59