Comment puis-je mettre à jour une variable de diffusion dans spark streaming?
j'ai, je crois, relativement utilisation de spark streaming:
j'ai un flux d'objets que je voudrais filtre sur la base de quelques données de référence
au départ, j'ai pensé que ce serait une chose très simple à réaliser en utilisant un Variable De Diffusion:
public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());
final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
Cependant, quoique rarement, mes données de référence va changer périodiquement
j'avais l'impression que Je pourrais modifier et diffusion ma variable sur le pilote et elle serait propagée à chacun des ouvriers, cependant la Broadcast
l'objet n'est pas Serializable
et doit être final
.
quelles solutions de rechange ai-je? Les trois solutions que je peux penser sont:
déplacer la recherche des données de référence dans un
forEachPartition
ouforEachRdd
de sorte qu'elle repose entièrement sur les ouvriers. Cependant, les données de référence vivent dans une API REST, donc je devrais aussi d'une manière ou d'une autre, stockez un temporisateur / compteur pour empêcher l'accès à la télécommande pour chaque élément du flux.redémarrez le contexte Spark chaque fois que refdata change, avec une nouvelle Variable Broadcast.
Convertissez les données de référence en RDD, puis
join
les cours d'eau de telle sorte que je suis maintenant en streamingPair<MyObject, RefData>
, bien que cela enverra les données de référence avec chaque objet.
4 réponses
prolongeant la réponse de @Rohan Aletty. Voici un exemple de code D'un radiodiffuseur qui rafraîchit la variable de diffusion basée sur un certain ttl
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
//Your logic to refresh
ReferenceData data = getRefData();
var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}
votre code ressemblerait à :
public void startSparkEngine() {
final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());
stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
});
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
Cela a fonctionné pour moi sur multi-cluster. Espérons que cela aide
presque tous ceux qui traitent des applications de streaming ont besoin d'un moyen de tisser (filtre, recherche, etc.) les données de référence (à partir de DB, fichiers, etc.) dans les données de streaming. Nous avons une solution partielle de l'ensemble de deux pièces
données de référence de recherche à utiliser dans les opérations de streaming
- créer CacheLookup objet désiré avec cache TTL
- wrap que dans la Diffusion
- utilisez CacheLookup dans le cadre de la diffusion en continu logique
Pour la plupart partie, cela fonctionne bien, sauf pour le suivant
mettre à Jour les données de référence
il n'y a pas de moyen définitif d'y parvenir malgré les suggestions dans ces fils, I. e: désactivez la variable de diffusion précédente et créez-en une nouvelle. Plusieurs inconnues comme quoi être prévu entre ces opérations.
C'est un besoin commun, il aurait aidé si il existe un moyen de envoyer l'information à la variable de diffusion informer mise à jour. Avec cela, il est possible d'invalider les caches locales dans "CacheLookup"
La deuxième partie du problème est toujours pas résolu. Je serais intéressé de savoir si il existe une approche viable pour ce
pas sûr si vous avez déjà essayé cela, mais je pense qu'une mise à jour vers une variable de diffusion peut être réalisée sans fermer le SparkContext
. Grâce à l'utilisation de l' unpersist()
méthode, des copies de la variable de diffusion sont supprimées sur chaque exécuteur et devraient être la variable devrait être rediffusée afin d'être accessible à nouveau. Pour votre cas d'utilisation, lorsque vous voulez mettre à jour votre émission, vous pouvez:
attendez que vos exécuteurs les séries de données
désactiver la variable de diffusion
mise à Jour de la diffusion de la variable
rediffusion pour envoyer les nouvelles données de référence aux exécuteurs
je m'inspire assez largement de ce post mais la personne qui a fait la dernière réponse a prétendu avoir obtenu de fonctionner localement. Il est important de noter que vous voulez probablement définir le blocage à true
sur l'unpersist de sorte que vous puissiez être sûr que les exécuteurs sont débarrassés des anciennes données (de sorte que les valeurs périmées ne seront pas relues sur la prochaine itération).
a récemment été confronté à ce problème. Pensé qu'il pourrait être utile pour les utilisateurs scala..
Scala façon de faire BroadCastWrapper
c'est comme l'exemple ci-dessous.
import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
/* wrapper lets us update brodcast variables within DStreams' foreachRDD
without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
chaque fois que vous avez besoin d'appeler la fonction de mise à jour pour obtenir la nouvelle variable de diffusion.