Comment créer une source akka-stream à partir d'un flux qui génère des valeurs de façon récursive?

je dois traverser une API qui a la forme d'un arbre. Par exemple, une structure de répertoire ou des fils de discussion. Il peut être modélisé par le flux suivant:

type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])

def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString 

// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
  if (id == 0) (1 to 9).toList
  else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
  else Nil

val itemFlow: Flow[ItemId, Item, NotUsed] = 
  Flow.fromFunction(id => Item(randomData, nested(id)))

Comment puis-je parcourir ces données? J'ai obtenu le travail suivant:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val loop = 
  GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val source = b.add(Flow[Int])
    val merge  = b.add(Merge[Int](2))
    val fetch  = b.add(itemFlow) 
    val bcast  = b.add(Broadcast[Item](2))

    val kids   = b.add(Flow[Item].mapConcat(_.kids))
    val data   = b.add(Flow[Item].map(_.data))

    val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)

    source ~> merge ~> fetch           ~> bcast ~> data
              merge <~ buffer <~ kids  <~ bcast

    FlowShape(source.in, data.out)
  }

val flow = Flow.fromGraph(loop)


Await.result(
  Source.single(0).via(flow).runWith(Sink.foreach(println)),
  Duration.Inf
)

system.terminate()

cependant, comme j'utilise un flux avec un tampon, le flux ne sera jamais complet.

S'achève lorsque l'amont est achevé et que les éléments tamponnés ont été drainé

le Flux de.tampon

j'ai lu la section cycles de graphe, liveness, and deadlocks plusieurs fois et je suis toujours en difficulté pour trouver une réponse.

Ce serait créer un live lock:

import java.util.concurrent.atomic.AtomicInteger

def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
  // keep track of how many element flows, 
  val remaining = new AtomicInteger(1) // 1 = seed

  // should be > max loop(x)
  val bufferSize = 10000

  val (ref, publisher) =
    Source.actorRef[S](bufferSize, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both)
      .run()

  ref ! seed

  Source.fromPublisher(publisher)
    .via(flow)
    .map{x =>
      loop(x).foreach{ c =>
        remaining.incrementAndGet()
        ref ! c
      }
      x
    }
    .takeWhile(_ => remaining.decrementAndGet > 0)
}

EDIT: j'ai ajouté un repo git pour tester votre solution https://github.com/MasseGuillaume/source-unfold

23
demandé sur Guillaume Massé 2018-07-29 13:24:27

3 réponses

Cause de non-achèvement

Je ne pense pas que la raison pour laquelle le cours d'eau ne se termine jamais soit due à "l'utilisation d'un écoulement avec un tampon". La cause réelle, similaire à cette question , est le fait que la fusion avec le paramètre par défaut eagerClose=False attend à la fois le source et le buffer à compléter avant qu'il (fusion) complète. Mais du tampon est en attente sur fusionner pour terminer. Alors la fusion attend buffer et buffer attendent la fusion.

Fusion eagerClose

vous pouvez définir eagerClose=True lors de la création de votre Fusion. Mais l'utilisation d'eager close peut malheureusement conduire à certains enfants ItemId valeurs ne sont jamais questionnées.

Solution Indirecte

si vous matérialisez un nouveau flux pour chaque niveau de l'arbre alors la récursion peut être extraite à l'extérieur du cours d'eau.

vous pouvez construire une fonction de requête en utilisant le itemFlow :

val itemQuery : Iterable[ItemId] => Future[Seq[Data]] = 
  (itemIds) => Source.apply(itemIds)
                     .via(itemFlow)
                     .runWith(Sink.seq[Data])

cette fonction de requête peut maintenant être enveloppée à l'intérieur d'une fonction d'aide récursive:

val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] = 
  (itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
      val allNewKids = allNewData.flatMap(_.kids).toSet

      if(allNewKids.isEmpty)
        Future.successful(currentData ++ allNewData)
      else
        recQuery(allNewKids, currentData ++ data)
  }

le nombre de cours d'eau créés sera égal à la profondeur maximale de l'arbre.

malheureusement, parce que les contrats à terme sont impliqués, cette fonction récursive n'est pas récursive de queue et pourrait entraîner un" débordement de la pile " si l'arbre est trop profond.

5
répondu Ramon J Romero y Vigil 2018-08-02 12:54:49

j'ai résolu ce problème en écrivant mon propre GraphStage.

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}

import scala.concurrent.ExecutionContext

import scala.collection.mutable
import scala.util.{Success, Failure, Try}

import scala.collection.mutable

def unfoldTree[S, E](seeds: List[S], 
                     flow: Flow[S, E, NotUsed],
                     loop: E => List[S],
                     bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
  Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}

object UnfoldSource {
  implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
    def dequeueN(n: Int): List[A] = {
      val b = List.newBuilder[A]
      var i = 0
      while (i < n) {
        val e = self.dequeue
        b += e
        i += 1
      }
      b.result()
    }
  }
}

class UnfoldSource[S, E](seeds: List[S],
                         flow: Flow[S, E, NotUsed],
                         loop: E => List[S],
                         bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {

  val out: Outlet[E] = Outlet("UnfoldSource.out")
  override val shape: SourceShape[E] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {  
    // Nodes to expand
    val frontier = mutable.Queue[S]()
    frontier ++= seeds

    // Nodes expanded
    val buffer = mutable.Queue[E]()

    // Using the flow to fetch more data
    var inFlight = false

    // Sink pulled but the buffer was empty
    var downstreamWaiting = false

    def isBufferFull() = buffer.size >= bufferSize

    def fillBuffer(): Unit = {
      val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
      val batch = frontier.dequeueN(batchSize)
      inFlight = true

      val toProcess =
        Source(batch)
          .via(flow)
          .runWith(Sink.seq)(materializer)

      val callback = getAsyncCallback[Try[Seq[E]]]{
        case Failure(ex) => {
          fail(out, ex)
        }
        case Success(es) => {
          val got = es.size
          inFlight = false
          es.foreach{ e =>
            buffer += e
            frontier ++= loop(e)
          }
          if (downstreamWaiting && buffer.nonEmpty) {
            val e = buffer.dequeue
            downstreamWaiting = false
            sendOne(e)
          } else {
            checkCompletion()
          }
          ()
        }
      }

      toProcess.onComplete(callback.invoke)
    }
    override def preStart(): Unit = {
      checkCompletion()
    }

    def checkCompletion(): Unit = {
      if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
        completeStage()
      }
    } 

    def sendOne(e: E): Unit = {
      push(out, e)
      checkCompletion()
    }

    def onPull(): Unit = {
      if (buffer.nonEmpty) {
        sendOne(buffer.dequeue)
      } else {
        downstreamWaiting = true
      }

      if (!isBufferFull && frontier.nonEmpty) {
        fillBuffer()
      }
    }

    setHandler(out, this)
  }
}
3
répondu Guillaume Massé 2018-08-05 21:02:25

Ah, les joies des cycles dans les ruisseaux Akka. J'ai eu un problème similaire que j'ai résolu en une profondément hacky. Éventuellement, il sera utile pour vous.

Hacky Solution:

  // add a graph stage that will complete successfully if it sees no element within 5 seconds
  val timedStopper = b.add(
    Flow[Item]
      .idleTimeout(5.seconds)
      .recoverWithRetries(1, {
        case _: TimeoutException => Source.empty[Item]
      }))

  source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
  merge <~ buffer <~ kids <~ bcast

ce que cela fait est que 5 secondes après que le dernier élément passe par l'étape timedStopper , cette étape complète le flux avec succès. Ceci est réalisé via l'utilisation de idleTimeout , qui échoue le flux avec un TimeoutException , puis en utilisant recoverWithRetries pour transformer cette défaillance en une réussite. (Je n'ai mentionné que c'était hacky).

ce n'est évidemment pas approprié si vous pourriez avoir plus de 5 secondes entre les éléments, ou si vous ne pouvez pas vous permettre une longue attente entre le flux" réellement " remplir et Akka ramasser sur elle. Heureusement, ni l'un ni l'autre étaient un souci pour nous, et dans ce cas, il fonctionne en fait assez bien!

Non-hacky solution

malheureusement, les seules façons dont je peux penser à faire cela sans tricher via des temps morts sont très, très compliquées.

Fondamentalement, vous devez être en mesure de suivre deux choses:

  • y a-t-il encore des éléments dans le tampon, ou en cours d'envoi au tampon
  • est la source entrante ouverte

et remplir le flux si et seulement si la réponse à ces deux questions est Non. natif Akka blocs de construction ne vont probablement pas être en mesure de gérer cela. Une étape graphique personnalisée pourrait, cependant. Une option pourrait être d'en écrire une qui prend la place de Merge et de lui donner un moyen de connaître le contenu du buffer, ou éventuellement de lui faire suivre à la fois les ID Qu'elle reçoit et les Id que la diffusion envoie au buffer. Le problème étant que les étapes graphiques personnalisées ne sont pas particulièrement agréables à écrire à le meilleur des temps, encore moins quand vous mélangez la logique à travers des étapes comme celle-ci.

mises en garde

Akka flux ne fonctionnent pas bien avec les cycles, en particulier comment ils calculent achèvement. En conséquence, cela peut ne pas être le seul problème que vous rencontrez.

par exemple, un problème que nous avons eu avec une structure très similaire était qu'une défaillance dans la source a été traitée comme le flux complétant avec succès, avec un a succédé Future en cours de réalisation. Le problème est que par défaut, une étape qui échoue échouera ses downstream mais annuler ses upstream (qui compte comme une réussite pour ces étapes). Avec un cycle comme celui que vous avez, le résultat est une course que l'annulation se propage une branche mais un échec à l'autre. Vous devez également vérifier ce qui se passe si les erreurs de l'évier; en fonction des paramètres d'annulation pour la diffusion, il est possible le l'annulation ne se propagera pas vers le haut et la source continuera volontiers à tirer dans les éléments.

une dernière option: éviter de manipuler la logique récursive avec des flux. À un extrême, s'il y a un moyen pour vous d'écrire une seule méthode de queue-récursive qui sort tous les éléments emboîtés à la fois et mettre cela dans une étape de flux, qui résoudra vos problèmes. De l'autre, nous envisageons sérieusement D'aller à Kafka faire la queue pour notre propre système.

-1
répondu Astrid 2018-08-01 19:44:00