Comment écrire à Kafka à partir de Spark Streaming
j'utilise le Streaming D'étincelles pour traiter les données entre deux files D'attente Kafka mais je ne trouve pas de bonne façon d'écrire sur Kafka à partir de Spark. J'ai essayé ceci:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
)
)
et cela fonctionne comme prévu mais instancier un nouveau KafkaProducer pour chaque message est clairement irréalisable dans un contexte réel et j'essaye de travailler autour de lui.
KafkaProducer n'est pas sérialisable, évidemment.
je voudrais garder une référence à une seule instance pour chaque processus et accéder lorsque j'ai besoin d'envoyer un message. Comment puis-je le faire?
6 réponses
mon premier conseil serait d'essayer de créer une nouvelle instance dans foreachPartition et measure si cela est assez rapide pour vos besoins (instantiating heavy objects in foreachPartition est ce que la documentation officielle suggère).
une autre option est d'utiliser un pool d'objets comme illustré dans ce exemple:
j'ai cependant eu du mal à l'implémenter lors de l'utilisation de checkpointing.
une autre version qui fonctionne bien pour moi est une usine comme décrit dans le post de blog suivant, vous avez juste à vérifier si elle fournit assez de parallélisme pour vos besoins (vérifier les commentaires la section):
http://allegro.technologie/2015/08/spark-Kafka-intégration.html
Oui, malheureusement, la Spark (1.x, 2.x) ne rend pas simple comment écrire à Kafka d'une manière efficace.
je te suggère la démarche suivante:
- Utiliser (et réutiliser)
KafkaProducer
instance par processus exécuteur / JVM.
Voici la configuration de haut niveau pour cette approche:
- tout d'abord, vous devez "envelopper" Kafka's
KafkaProducer
parce que, comme vous l'avez mentionné, il n'est pas sérialisable. Emballage il vous permet de "l'envoyer" aux exécuteurs. L'idée clé ici est d'utiliser unlazy val
de sorte que vous retardez l'instanciation du producteur jusqu'à sa première utilisation, ce qui est effectivement une solution pour que vous n'ayez pas à vous soucier deKafkaProducer
n'étant pas sérialisable. - vous "expédiez" le producteur enveloppé à chaque exécuteur testamentaire en utilisant une variable de diffusion.
- dans votre logique de traitement réel, vous accédez au producteur Enveloppé par la variable broadcast, et l'utilisez pour écrire les résultats du traitement retour à la Kafka.
les extraits de code ci-dessous fonctionnent avec Spark Streaming à partir de Spark 2.0.
Étape 1: Emballage KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new MySparkKafkaProducer(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
Étape 2: Utilisez une variable de diffusion pour donner à chaque exécuteur son propre enveloppement KafkaProducer
exemple
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}
Etape 3: Ecrire à partir de Spark Streaming vers Kafka, en réutilisant le même wrapped KafkaProducer
exemple (pour chaque exécuteur testamentaire)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}
j'Espère cela aide.
Il y a un flux de Kafka, Écrivain maintenu par Cloudera (en fait le spin off à partir d'une Étincelle JIRA [1]