Gestion des mauvais messages à L'aide de L'API Streams de Kafka
j'ai un flux de traitement de base qui ressemble à
master topic -> my processing in a mapper/filter -> output topics
et je me demande Quelle est la meilleure façon de gérer les "mauvais messages". Cela pourrait être des choses comme des messages que je ne peux pas désérialiser correctement, ou peut-être que la logique de traitement/filtrage échoue d'une manière inattendue (je n'ai pas de dépendances externes, donc il ne devrait pas y avoir d'erreurs transitoires de ce genre).
j'envisageais d'emballer tout mon code de traitement / filtrage dans une prise d'essai et si un une exception a été soulevée, puis l'acheminement vers un "sujet d'erreur". Puis je peux étudier le message et le modifier ou corriger mon code comme il se doit et le rejouer sur master. Si je laisse des exceptions se propager, le flux semble être bloqué et aucun autre message n'est reçu.
- cette approche est-elle considérée comme une pratique exemplaire?
- y a-t-il un moyen pratique pour Kafka streams de gérer cela? Je ne pense pas qu'il y ait un concept de DQL...
- quelles sont les solutions de rechange arrêter Kafka de brouiller un "mauvais message"?
- quelles autres méthodes de traitement des erreurs existe-t-il?
Pour l'exhaustivité, voici mon code (pseudo-ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() {
@Override
public AnalysedDocument apply(String rawValue) {
Document document;
try {
// Deserialise
document = ...
} catch (Exception e) {
return new AnalysedDocument(rawValue, exception);
}
try {
// Perform analysis
Analysis analysis = ...
return new AnalysedDocument(document, analysis);
} catch (Exception e) {
return new AnalysedDocument(document, exception);
}
}
});
// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Toute aide grandement appréciée.
2 réponses
à l'heure actuelle, Kafka Streams n'offre que des capacités limitées de gestion des erreurs. Des travaux sont en cours pour simplifier cette procédure. Pour l'instant, votre approche globale semble être une bonne façon d'aller.
un commentaire sur le traitement des erreurs de/serialization: le traitement manuel de ces erreurs exige que vous fassiez la de / serialization "manuellement". Cela signifie, vous devez configurer ByteArraySerde
s pour la clé et la valeur pour vous d'entrée/sortie thème de votre Flux d'application et ajouter un map()
qui ne le de/de sérialisation (ie,KStream<byte[],byte[]> -> map() -> KStream<keyType,valueType>
-- ou l'inverse si vous voulez aussi attraper les exceptions de sérialisation). Sinon, vous ne pouvez pas try-catch
deseralization exceptions.
avec votre approche actuelle, vous validez "seulement" que la chaîne donnée représente un document valide -- mais cela pourrait être le cas, que le message lui-même est corrompu et ne peut pas être converti en un String
dans l'opérateur source en premier lieu. Ainsi, vous ne couvrez pas réellement l'exception de désérialisation avec votre code. Cependant, si vous êtes sûr qu'une exception de désérialisation ne peut jamais se produire, votre approche serait également suffisante.
mise à Jour
cette question est abordée via KIP-161 et sera inclus dans la prochaine version 1.0.0. Il vous permet d'enregistrer un rappel via le paramètre default.deserialization.exception.handler
. Le handler sera invoqué à chaque fois qu'une exception se produira lors de la deserialisation et vous permettra de retourner un DeserializationResponse
(CONTINUE
-> chute de l'enregistrement d'un déménagement, ou FAIL
c'est la valeur par défaut).
mise à Jour 2
KIP-210 (fera partie de dans Kafka 1.1) il est également possible de gérer les erreurs du côté du producteur, similaire à la partie du consommateur, en enregistrant un ProductionExceptionHandler
via config default.production.exception.handler
cela peut revenir CONTINUE
.
Mise À Jour Du 23 Mars 2018: Kafka 1.0 fournit beaucoup mieux et plus facile la manipulation pour les messages d'erreur mauvais ("pilules empoisonnées") via KIP-161 de ce que j'ai décrit ci-dessous. Voir par défaut.désérialisation.exception.handler dans le Kafka 1.0 docs.
cela pourrait potentiellement être des choses comme des messages que je ne peux pas désérialiser correctement [...]
Ok, ma réponse ici se concentre sur le (de)les problèmes de sérialisation car cela pourrait être le scénario le plus délicat à gérer pour la plupart des utilisateurs.
[...] ou peut-être que la logique de traitement/filtrage échoue d'une manière inattendue (je n'ai pas de dépendances externes, donc il ne devrait pas y avoir d'erreurs transitoires de ce genre).
le même raisonnement (pour la desérialisation) peut aussi s'appliquer aux échecs dans la logique de traitement. Ici, la plupart des gens ont tendance à graviter vers l'option 2 ci-dessous (moins le deserialization part), mais YMMV.
j'envisageais d'emballer tout mon code de traitement / filtrage dans un try catch et si une exception était soulevée alors routage vers un"sujet d'erreur". Puis je peux étudier le message et le modifier ou corriger mon code comme il se doit et le rejouer sur master. Si je laisse des exceptions se propager, le flux semble être bloqué et aucun autre message n'est reçu.
- cette approche est-elle considérée comme une pratique exemplaire?
Oui, en ce moment c'est la voie à suivre. Essentiellement, les deux modèles les plus courants sont (1) sauter des messages corrompus ou (2) envoyer des enregistrements corrompus à un sujet de quarantaine aka une queue de lettre morte.
- Est-il une pratique Kafka flux façon de gérer cela? Je ne pense pas qu'il y ait un concept de DQL...
Oui, il y a un moyen de gérer cela, y compris l'utilisation d'une file d'attente de lettres mortes. Cependant, il est (au moins IMHO) pas si commode encore. Si vous avez des commentaires sur la façon dont L'API devrait vous permettre de gérer cela -- par exemple via une méthode nouvelle ou mise à jour, un paramètre de configuration ("si la sérialisation/désérialisation échoue, envoyez l'enregistrement problématique à ce sujet de quarantaine") -- veuillez nous le faire savoir. : -)
- quelles sont les autres façons d'empêcher Kafka de brouiller un "mauvais message"?
- quelles autres méthodes de traitement des erreurs existe-t-il?
Voir mes exemples ci-dessous.
FWIW, la communauté Kafka discute également de l'ajout d'un nouvel outil CLI qui vous permet de sauter des messages corrompus. Cependant, en tant qu'utilisateur de L'API Kafka Streams, je pense qu'idéalement vous voulez gérer de tels scénarios directement dans votre code, et ne faire appel aux utilitaires CLI qu'en dernier recours.
voici quelques modèles pour les flux de Kafka DSL pour traiter les enregistrements/messages corrompus alias "pilules empoisonnées". Ceci est tiré de http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
Option 1: sauter les enregistrements corrompus avec flatMap
C'est sans doute ce que la plupart des utilisateurs aimeraient faire.
- Nous utilisons
flatMap
parce qu'il vous permet de zéro, un, ou plusieurs enregistrements de sortie par enregistrement d'entrée. Dans le cas d'un enregistrement corrompu nous ne produisons rien (zero records), ignorant ainsi/sautant l'enregistrement corrompu. - avantage de cette approche par rapport aux autres: nous n'avons besoin de desérialiser manuellement un enregistrement qu'une seule fois!
- Inconvénient de cette approche:
flatMap
"marque" le flux d'entrée pour le re-partitionnement potentiel des données, c.-à-d. Si vous effectuez une opération basée sur des clés telles que des regroupements (groupBy
/groupByKey
) ou se joint par la suite, vos données seront re-partitionnées dans les coulisses. Comme cela pourrait être coûteux étape nous ne voulons pas que cela arrive inutilement. Si vous savez que les clés d'enregistrement sont toujours valides ou que vous n'avez pas besoin d'opérer sur les clés (les gardant ainsi comme des clés "brutes" enbyte[]
format), vous pouvez changer deflatMap
flatMapValues
, ce qui n'entraînera pas de re-partitionnement des données même si vous rejoignez/groupez/Agrégez le flux plus tard.
exemple de Code:
Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
// Input topic, which might contain corrupted messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);
// Note how the returned stream is of type KStream<String, Long>,
// rather than KStream<byte[], byte[]>.
KStream<String, Long> doubled = input.flatMap(
(k, v) -> {
try {
// Attempt deserialization
String key = stringSerde.deserializer().deserialize(inputTopic, k);
long value = longSerde.deserializer().deserialize(inputTopic, v);
// Ok, the record is valid (not corrupted). Let's take the
// opportunity to also process the record in some way so that
// we haven't paid the deserialization cost just for "poison pill"
// checking.
return Collections.singletonList(KeyValue.pair(key, 2 * value));
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return Collections.emptyList();
}
);
Option 2: file d'attente de lettres mortes avec branch
comparés pour l'option 1 (qui ignore les enregistrements corrompus) l'option 2 conserve les messages corrompus en les filtrant hors du flux d'entrée "principal" et en les écrivant à un sujet de quarantaine (pensez: la queue des lettres mortes). L'inconvénient est que, pour les enregistrements valides, nous devons payer le coût de désérialisation manuelle deux fois.
KStream<byte[], byte[]> input = ...;
KStream<byte[], byte[]>[] partitioned = input.branch(
(k, v) -> {
boolean isValidRecord = false;
try {
stringSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException ignored) {}
return isValidRecord;
},
(k, v) -> true
);
// partitioned[0] is the KStream<byte[], byte[]> that contains
// only valid records. partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream<String, Long> doubled = partitioned[0].map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");
Option 3: sauter les enregistrements corrompus avec filter
Je n'en parle que pour être complet. Cette option ressemble à un mélange d'options 1 et 2, mais est pire qu'eux. Par rapport à l'option 1, vous devez payer le coût de désérialisation manuelle pour les enregistrements valides deux fois (mauvais!). Par rapport à l'option 2, vous perdez la possibilité de conserver des enregistrements corrompus dans une file d'attente de lettres mortes.
KStream<byte[], byte[]> validRecordsOnly = input.filter(
(k, v) -> {
boolean isValidRecord = false;
try {
bytesSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return isValidRecord;
}
);
KStream<String, Long> doubled = validRecordsOnly.map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
Toute aide grandement appréciée.
j'espère pouvoir vous aider. Si oui, j'apprécierais vos commentaires sur la façon dont nous pourrions améliorer L'API Kafka Streams pour gérer les échecs/exceptions d'une manière meilleure/plus pratique que aujourd'. : -)