Comment créer une source akka-stream à partir d'un flux qui génère des valeurs de façon récursive?
je dois traverser une API qui a la forme d'un arbre. Par exemple, une structure de répertoire ou des fils de discussion. Il peut être modélisé par le flux suivant:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil
val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))
Comment puis-je parcourir ces données? J'ai obtenu le travail suivant:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))
val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))
val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)
source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
FlowShape(source.in, data.out)
}
val flow = Flow.fromGraph(loop)
Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)
system.terminate()
cependant, comme j'utilise un flux avec un tampon, le flux ne sera jamais complet.
S'achève lorsque l'amont est achevé et que les éléments tamponnés ont été drainé
j'ai lu la section cycles de graphe, liveness, and deadlocks plusieurs fois et je suis toujours en difficulté pour trouver une réponse.
Ce serait créer un live lock:
import java.util.concurrent.atomic.AtomicInteger
def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed
// should be > max loop(x)
val bufferSize = 10000
val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
ref ! seed
Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}
EDIT: j'ai ajouté un repo git pour tester votre solution https://github.com/MasseGuillaume/source-unfold
3 réponses
Cause de non-achèvement
Je ne pense pas que la raison pour laquelle le cours d'eau ne se termine jamais soit due à "l'utilisation d'un écoulement avec un tampon". La cause réelle, similaire à cette question , est le fait que la fusion avec le paramètre par défaut eagerClose=False
attend à la fois le source
et le buffer
à compléter avant qu'il (fusion) complète. Mais du tampon est en attente sur fusionner pour terminer. Alors la fusion attend buffer et buffer attendent la fusion.
Fusion eagerClose
vous pouvez définir eagerClose=True
lors de la création de votre Fusion. Mais l'utilisation d'eager close peut malheureusement conduire à certains enfants ItemId
valeurs ne sont jamais questionnées.
Solution Indirecte
si vous matérialisez un nouveau flux pour chaque niveau de l'arbre alors la récursion peut être extraite à l'extérieur du cours d'eau.
vous pouvez construire une fonction de requête en utilisant le itemFlow
:
val itemQuery : Iterable[ItemId] => Future[Seq[Data]] =
(itemIds) => Source.apply(itemIds)
.via(itemFlow)
.runWith(Sink.seq[Data])
cette fonction de requête peut maintenant être enveloppée à l'intérieur d'une fonction d'aide récursive:
val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] =
(itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
val allNewKids = allNewData.flatMap(_.kids).toSet
if(allNewKids.isEmpty)
Future.successful(currentData ++ allNewData)
else
recQuery(allNewKids, currentData ++ data)
}
le nombre de cours d'eau créés sera égal à la profondeur maximale de l'arbre.
malheureusement, parce que les contrats à terme sont impliqués, cette fonction récursive n'est pas récursive de queue et pourrait entraîner un" débordement de la pile " si l'arbre est trop profond.
j'ai résolu ce problème en écrivant mon propre GraphStage.
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scala.concurrent.ExecutionContext
import scala.collection.mutable
import scala.util.{Success, Failure, Try}
import scala.collection.mutable
def unfoldTree[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}
object UnfoldSource {
implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
def dequeueN(n: Int): List[A] = {
val b = List.newBuilder[A]
var i = 0
while (i < n) {
val e = self.dequeue
b += e
i += 1
}
b.result()
}
}
}
class UnfoldSource[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldSource.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
// Nodes to expand
val frontier = mutable.Queue[S]()
frontier ++= seeds
// Nodes expanded
val buffer = mutable.Queue[E]()
// Using the flow to fetch more data
var inFlight = false
// Sink pulled but the buffer was empty
var downstreamWaiting = false
def isBufferFull() = buffer.size >= bufferSize
def fillBuffer(): Unit = {
val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
val batch = frontier.dequeueN(batchSize)
inFlight = true
val toProcess =
Source(batch)
.via(flow)
.runWith(Sink.seq)(materializer)
val callback = getAsyncCallback[Try[Seq[E]]]{
case Failure(ex) => {
fail(out, ex)
}
case Success(es) => {
val got = es.size
inFlight = false
es.foreach{ e =>
buffer += e
frontier ++= loop(e)
}
if (downstreamWaiting && buffer.nonEmpty) {
val e = buffer.dequeue
downstreamWaiting = false
sendOne(e)
} else {
checkCompletion()
}
()
}
}
toProcess.onComplete(callback.invoke)
}
override def preStart(): Unit = {
checkCompletion()
}
def checkCompletion(): Unit = {
if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
completeStage()
}
}
def sendOne(e: E): Unit = {
push(out, e)
checkCompletion()
}
def onPull(): Unit = {
if (buffer.nonEmpty) {
sendOne(buffer.dequeue)
} else {
downstreamWaiting = true
}
if (!isBufferFull && frontier.nonEmpty) {
fillBuffer()
}
}
setHandler(out, this)
}
}
Ah, les joies des cycles dans les ruisseaux Akka. J'ai eu un problème similaire que j'ai résolu en une profondément hacky. Éventuellement, il sera utile pour vous.
Hacky Solution:
// add a graph stage that will complete successfully if it sees no element within 5 seconds
val timedStopper = b.add(
Flow[Item]
.idleTimeout(5.seconds)
.recoverWithRetries(1, {
case _: TimeoutException => Source.empty[Item]
}))
source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
ce que cela fait est que 5 secondes après que le dernier élément passe par l'étape timedStopper
, cette étape complète le flux avec succès. Ceci est réalisé via l'utilisation de idleTimeout
, qui échoue le flux avec un TimeoutException
, puis en utilisant recoverWithRetries
pour transformer cette défaillance en une réussite. (Je n'ai mentionné que c'était hacky).
ce n'est évidemment pas approprié si vous pourriez avoir plus de 5 secondes entre les éléments, ou si vous ne pouvez pas vous permettre une longue attente entre le flux" réellement " remplir et Akka ramasser sur elle. Heureusement, ni l'un ni l'autre étaient un souci pour nous, et dans ce cas, il fonctionne en fait assez bien!
Non-hacky solution
malheureusement, les seules façons dont je peux penser à faire cela sans tricher via des temps morts sont très, très compliquées.
Fondamentalement, vous devez être en mesure de suivre deux choses:
- y a-t-il encore des éléments dans le tampon, ou en cours d'envoi au tampon
- est la source entrante ouverte
et remplir le flux si et seulement si la réponse à ces deux questions est Non. natif Akka blocs de construction ne vont probablement pas être en mesure de gérer cela. Une étape graphique personnalisée pourrait, cependant. Une option pourrait être d'en écrire une qui prend la place de Merge
et de lui donner un moyen de connaître le contenu du buffer, ou éventuellement de lui faire suivre à la fois les ID Qu'elle reçoit et les Id que la diffusion envoie au buffer. Le problème étant que les étapes graphiques personnalisées ne sont pas particulièrement agréables à écrire à le meilleur des temps, encore moins quand vous mélangez la logique à travers des étapes comme celle-ci.
mises en garde
Akka flux ne fonctionnent pas bien avec les cycles, en particulier comment ils calculent achèvement. En conséquence, cela peut ne pas être le seul problème que vous rencontrez.
par exemple, un problème que nous avons eu avec une structure très similaire était qu'une défaillance dans la source a été traitée comme le flux complétant avec succès, avec un a succédé Future
en cours de réalisation. Le problème est que par défaut, une étape qui échoue échouera ses downstream mais annuler ses upstream (qui compte comme une réussite pour ces étapes). Avec un cycle comme celui que vous avez, le résultat est une course que l'annulation se propage une branche mais un échec à l'autre. Vous devez également vérifier ce qui se passe si les erreurs de l'évier; en fonction des paramètres d'annulation pour la diffusion, il est possible le l'annulation ne se propagera pas vers le haut et la source continuera volontiers à tirer dans les éléments.
une dernière option: éviter de manipuler la logique récursive avec des flux. À un extrême, s'il y a un moyen pour vous d'écrire une seule méthode de queue-récursive qui sort tous les éléments emboîtés à la fois et mettre cela dans une étape de flux, qui résoudra vos problèmes. De l'autre, nous envisageons sérieusement D'aller à Kafka faire la queue pour notre propre système.