Scala en attente d'une séquence de futurs
j'espérais que le code comme suit attendrait les deux futurs, mais ce n'est pas le cas.
object Fiddle {
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
val seq = Future.sequence(lf)
seq.onComplete {
_ => lf.foreach(f => println(f.isCompleted))
}
}
val a = FuturesSequence
j'ai supposé que seq.onComplete
attendrait qu'ils tous se complètent avant de se compléter, mais pas ainsi; il en résulte:
true
false
.sequence
était un peu difficile à suivre dans la source de scala.simultané.Futur, je me demande comment je mettrais en œuvre un parallèle qui attend tous les futurs originaux d'une séquence (de taille dynamique), ou ce qui pourrait le problème ici.
Edit: Une question connexe: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)
4 réponses
une approche commune pour attendre tous les résultats (ratés ou non) est de" lever " les échecs dans une nouvelle représentation à l'intérieur du futur, de sorte que tous les futurs complets avec un certain résultat (bien qu'ils puissent compléter avec un résultat qui représente l'échec). Une façon naturelle d'obtenir cela est le levage à un Try
.
la mise en œuvre de Twitter de futures fournit une méthode liftToTry
qui rend ce trivial, mais vous pouvez faire quelque chose de similaire avec la mise en œuvre de la bibliothèque standard:
import scala.util.{ Failure, Success, Try }
val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
_.map(Success(_)).recover { case t => Failure(t) }
)
Maintenant Future.sequence(lifted)
sera terminée lorsque tous les futurs est terminé, et représentera les succès et les échecs à l'aide de Try
.
et donc, une solution générique pour attendre sur tous les futurs originaux d'une séquence de futurs peut ressembler à ce qui suit, en supposant qu'un contexte d'exécution est naturellement implicitement disponible.
import scala.util.{ Failure, Success, Try }
private def lift[T](futures: Seq[Future[T]]) =
futures.map(_.map { Success(_) }.recover { case t => Failure(t) })
def waitAll[T](futures: Seq[Future[T]]) =
Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used
waitAll(SeqOfFutures).map {
// do whatever with the completed futures
}
a Future
produit par Future.sequence
remplit l'une des conditions suivantes:
- tous les contrats à terme ont été conclus avec succès, ou
- l'un des contrats à terme a échoué
le deuxième point est ce qui se passe dans votre cas, et il est logique de compléter dès que l'un des Future
enveloppé a échoué, parce que l'emballage Future
ne peut tenir un seul Throwable
dans le cas de la défaillance. Il n'y a pas de raison d'attendre les autres futurs parce que le résultat sera le même échec.
il s'agit d'un exemple qui appuie la réponse précédente. Il y a un moyen facile de le faire en utilisant seulement L'APIs Scala standard.
dans l'exemple, je crée 3 futures. Ils se termineront respectivement à 5, 7 et 9 secondes. L'appel à Await.result
sera bloqué jusqu'à ce que tous les contrats à terme aient résolu. Une fois que les 3 contrats à terme seront terminés, a
sera défini à List(5,7,9)
et l'exécution se poursuivra.
en outre, si une exception est jeté dans l'un des futurs, Await.result
va immédiatement débloquer et jeter l'exception. Décommentez la ligne Exception(...)
pour voir cela en action.
try {
val a = Await.result(Future.sequence(Seq(
Future({
blocking {
Thread.sleep(5000)
}
System.err.println("A")
5
}),
Future({
blocking {
Thread.sleep(7000)
}
System.err.println("B")
7
//throw new Exception("Ha!")
}),
Future({
blocking {
Thread.sleep(9000)
}
System.err.println("C")
9
}))),
Duration("100 sec"))
System.err.println(a)
} catch {
case e: Exception ⇒
e.printStackTrace()
}
nous pouvons enrichir Seq[Future[T]]
avec sa propre méthode onComplete
à travers une classe implicite:
def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
f map { Success(_) } recover { case e => Failure(e) }
def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
fs map { lift(_) }
implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
Future.sequence(lift(fs)) onComplete {
case Success(s) => f(s)
case Failure(e) => throw e // will never happen, because of the Try lifting
}
}
}
alors, dans votre MWE particulier, vous pouvez faire:
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2)
lf onComplete { _ map {
case Success(v) => ???
case Failure(e) => ???
}}
Cette solution a l'avantage de vous permettre d'appeler un onComplete
sur une séquence de futurs comme vous le feriez sur un seul futur.