Comment écrire à Kafka à partir de Spark Streaming

j'utilise le Streaming D'étincelles pour traiter les données entre deux files D'attente Kafka mais je ne trouve pas de bonne façon d'écrire sur Kafka à partir de Spark. J'ai essayé ceci:

input.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case x:String=>{

                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("output",null,x)
                    producer.send(message)
                  }
                }


      )
    ) 

et cela fonctionne comme prévu mais instancier un nouveau KafkaProducer pour chaque message est clairement irréalisable dans un contexte réel et j'essaye de travailler autour de lui.

KafkaProducer n'est pas sérialisable, évidemment.

je voudrais garder une référence à une seule instance pour chaque processus et accéder lorsque j'ai besoin d'envoyer un message. Comment puis-je le faire?

28
demandé sur Chobeat 2015-07-23 17:39:29

6 réponses

mon premier conseil serait d'essayer de créer une nouvelle instance dans foreachPartition et measure si cela est assez rapide pour vos besoins (instantiating heavy objects in foreachPartition est ce que la documentation officielle suggère).

une autre option est d'utiliser un pool d'objets comme illustré dans ce exemple:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

j'ai cependant eu du mal à l'implémenter lors de l'utilisation de checkpointing.

une autre version qui fonctionne bien pour moi est une usine comme décrit dans le post de blog suivant, vous avez juste à vérifier si elle fournit assez de parallélisme pour vos besoins (vérifier les commentaires la section):

http://allegro.technologie/2015/08/spark-Kafka-intégration.html

17
répondu Marius Soutier 2016-03-01 15:47:37

Oui, malheureusement, la Spark (1.x, 2.x) ne rend pas simple comment écrire à Kafka d'une manière efficace.

je te suggère la démarche suivante:

  • Utiliser (et réutiliser)KafkaProducer instance par processus exécuteur / JVM.

Voici la configuration de haut niveau pour cette approche:

  1. tout d'abord, vous devez "envelopper" Kafka's KafkaProducer parce que, comme vous l'avez mentionné, il n'est pas sérialisable. Emballage il vous permet de "l'envoyer" aux exécuteurs. L'idée clé ici est d'utiliser un lazy val de sorte que vous retardez l'instanciation du producteur jusqu'à sa première utilisation, ce qui est effectivement une solution pour que vous n'ayez pas à vous soucier de KafkaProducer n'étant pas sérialisable.
  2. vous "expédiez" le producteur enveloppé à chaque exécuteur testamentaire en utilisant une variable de diffusion.
  3. dans votre logique de traitement réel, vous accédez au producteur Enveloppé par la variable broadcast, et l'utilisez pour écrire les résultats du traitement retour à la Kafka.

les extraits de code ci-dessous fonctionnent avec Spark Streaming à partir de Spark 2.0.

Étape 1: Emballage KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

Étape 2: Utilisez une variable de diffusion pour donner à chaque exécuteur son propre enveloppement KafkaProducer exemple

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

Etape 3: Ecrire à partir de Spark Streaming vers Kafka, en réutilisant le même wrapped KafkaProducer exemple (pour chaque exécuteur testamentaire)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

j'Espère cela aide.

25
répondu Michael G. Noll 2016-09-16 19:56:38

Il y a un flux de Kafka, Écrivain maintenu par Cloudera (en fait le spin off à partir d'une Étincelle JIRA [1]