Thème De La Purge De Kafka
j'ai poussé un message qui était trop grand dans un sujet de message kafka sur ma machine locale, maintenant je reçois une erreur:
kafka.common.InvalidMessageSizeException: invalid message size
augmenter le fetch.size
n'est pas idéal ici, parce que je ne veux pas vraiment accepter des messages aussi gros. Est-il un moyen de purger le sujet de kafka?
13 réponses
mettre temporairement à jour le temps de rétention sur le sujet à une seconde:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
puis attendre que la purge prenne effet (environ une minute). Une fois purgé, restaurer la valeur précédente retention.ms
.
Voici les étapes à suivre pour supprimer un sujet nommé MyTopic
:
- Arrêter Apache Kafka démon
- supprimer le dossier de données topic:
rm -rf /tmp/kafka-logs/MyTopic-0
- supprimer les métadonnées du sujet:
zkCli.sh
puisrmr /brokers/MyTopic
- Démarrer Apache Kafka démon
si vous manquez l'étape 3, alors Apache Kafka continuera à signaler le sujet est présent (par exemple, si vous exécutez kafka-list-topic.sh
).
Testé avec Apache Kafka 0.8.0.
pour purger la file d'attente, vous pouvez supprimer le sujet:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
puis le recréer:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
bien que la réponse acceptée soit correcte, cette méthode a été dépréciée. La configuration du sujet doit maintenant être effectuée via kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
les Configurations définies par cette méthode peuvent être affichées avec la commande
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
testé en Kafka 0.8.2, pour l'exemple de démarrage rapide: Tout d'abord, ajouter une ligne au serveur.fichier de propriétés sous le dossier de configuration:
delete.topic.enable=true
alors, vous pouvez exécuter cette commande:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
mise à JOUR: Cette réponse est pertinente pour Kafka 0.6. Pour Kafka 0.8 et plus tard voir la réponse de @Patrick.
Oui, arrêter kafka et supprimer manuellement tous les fichiers du sous-répertoire correspondant (il est facile de le trouver dans le répertoire de données kafka). Après kafka redémarrer le sujet sera vide.
kafka n'ont pas de méthode directe pour la purge et le nettoyage des sujet (Files d'attente), mais peuvent le faire via la suppression de ce topic et de le recréer.
premier de assurez-vous sever.propriétés fichier a et si non Ajouter delete.topic.enable=true
ensuite, Supprimer sujet
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
puis le créer à nouveau.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
parfois, si vous avez un cluster saturé (trop de partitions, ou en utilisant des données de sujet cryptées, ou en utilisant SSL, ou le contrôleur est sur un mauvais noeud, ou la connexion est flasque, il faudra beaucoup de temps pour purger ledit sujet.
je suis ces étapes, en particulier si vous utilisez Avro.
1: Exécuter avec des outils kafka:
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Exécuter sur le Schéma nœud de registre:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: ramenez la rétention du sujet au cadre d'origine, une fois que le sujet est vide.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
espérons que cela aide quelqu'un, car il n'est pas facilement annoncé.
la méthode la plus simple consiste à établir que la date des fichiers journaux individuels est antérieure à la période de conservation. Ensuite, le courtier devrait les nettoyer et les enlever pour vous en quelques secondes. Cela offre plusieurs avantages:
- Pas besoin d'abattre des courtiers, c'est un moteur d'exécution de l'opération.
- évite la possibilité d'exceptions de compensation invalides (voir ci-dessous).
D'après mon expérience avec Kafka 0.7.x, supprimer les fichiers journaux et redémarrer le courtier pourrait conduire à des exceptions de offset invalides pour certains consommateurs. Cela se produirait parce que le courtier redémarre les offsets à zéro (en l'absence de tout fichier journal existant), et un consommateur qui consommait auparavant à partir du sujet se reconnecterait pour demander un offset spécifique [une fois valide]. Si cette compensation se trouve à l'extérieur des limites des nouvelles logs de sujet, alors aucun mal et le consommateur reprend soit au début ou le fin. Mais, si l'offset tombe à l'intérieur des limites des journaux de nouveaux sujets, le courtier tente de récupérer le jeu de messages mais échoue parce que l'offset ne s'aligne pas à un message réel.
cela pourrait être atténué en compensant également les compensations des consommateurs dans zookeeper pour ce sujet. Mais si vous n'avez pas besoin d'un sujet vierge et que vous voulez simplement supprimer le contenu existant, alors simplement "toucher"quelques logs de sujet est beaucoup plus facile et plus fiable, que l'arrêt des courtiers, la suppression de sujet des logs, et le nettoyage de certains noeuds de gardien de zoo.
le Conseil de Thomas est grand mais malheureusement zkCli
dans les anciennes versions de Zookeeper (par exemple 3.3.6) ne semblent pas soutenir rmr
. Par exemple, comparez l'implémentation en ligne de commande dans modern Zookeeper avec version 3.3 .
si vous êtes confronté à une ancienne version de Zookeeper une solution est d'utiliser une bibliothèque client comme zc.zk pour Python. Pour les personnes qui ne sont pas familières avec Python, vous devez l'installer en utilisant pip ou easy_install . Puis démarrez un shell Python ( python
) et vous pouvez faire:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
ou même
zk.delete_recursive('brokers')
si vous voulez supprimer tous les sujets de Kafka.
pour nettoyer tous les messages d'un sujet particulier en utilisant votre groupe d'application (GroupName doit être le même que application Kafka nom du groupe).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
ne pouvait pas ajouter de commentaire en raison de la taille: Je ne sais pas si c'est vrai, à part la mise à jour de la conservation.ms and retention.octets, mais j'ai remarqué sujet de nettoyage devrait être la politique de "supprimer" (par défaut), si "compact", il va tenir à des messages plus longs, c'est à dire, si il est "compact", vous devez spécifier supprimer".la rétention.ms aussi.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
devait également surveiller les plus anciens/Plus récents décalages devraient être les mêmes pour confirmer cette réussi à est arrivé, pouvez également vérifier le du-h /tmp/kafka-journaux/test-sujet-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += } END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += } END {print sum}'
26599762
l'autre problème est, vous devez obtenir la config actuelle première donc vous vous rappelez de revenir en arrière après la suppression est réussie:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
une autre approche, plutôt manuelle, pour ce faire est:
dans les courtiers:
- stop kafka courtier
sudo service kafka stop
- supprimer tous les fichiers journaux de partition (doit être fait sur tous les courtiers)
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
dans zookeeper:
- exécuter interface de ligne de commande de zookeeper
sudo /usr/lib/zookeeper/bin/zkCli.sh
- utilisez zkCli pour supprimer les métadonnées du sujet
rmr /brokers/topic/<some_topic_name>
les courtiers de nouveau:
- service de courtier en redémarrage
sudo service kafka start