Thème De La Purge De Kafka

j'ai poussé un message qui était trop grand dans un sujet de message kafka sur ma machine locale, maintenant je reçois une erreur:

kafka.common.InvalidMessageSizeException: invalid message size

augmenter le fetch.size n'est pas idéal ici, parce que je ne veux pas vraiment accepter des messages aussi gros. Est-il un moyen de purger le sujet de kafka?

108
demandé sur cricket_007 2013-04-29 21:10:57

13 réponses

mettre temporairement à jour le temps de rétention sur le sujet à une seconde:

kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000

puis attendre que la purge prenne effet (environ une minute). Une fois purgé, restaurer la valeur précédente retention.ms .

283
répondu steven appleyard 2015-04-16 13:44:58

Voici les étapes à suivre pour supprimer un sujet nommé MyTopic :

  1. Arrêter Apache Kafka démon
  2. supprimer le dossier de données topic: rm -rf /tmp/kafka-logs/MyTopic-0
  3. supprimer les métadonnées du sujet: zkCli.sh puis rmr /brokers/MyTopic
  4. Démarrer Apache Kafka démon

si vous manquez l'étape 3, alors Apache Kafka continuera à signaler le sujet est présent (par exemple, si vous exécutez kafka-list-topic.sh ).

Testé avec Apache Kafka 0.8.0.

41
répondu Thomas Bratt 2014-02-19 13:32:42

pour purger la file d'attente, vous pouvez supprimer le sujet:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

puis le recréer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test
38
répondu rjaiswal 2016-08-29 14:14:36

bien que la réponse acceptée soit correcte, cette méthode a été dépréciée. La configuration du sujet doit maintenant être effectuée via kafka-configs .

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

les Configurations définies par cette méthode peuvent être affichées avec la commande

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
32
répondu Shane Perry 2016-04-21 17:56:09

testé en Kafka 0.8.2, pour l'exemple de démarrage rapide: Tout d'abord, ajouter une ligne au serveur.fichier de propriétés sous le dossier de configuration:

delete.topic.enable=true

alors, vous pouvez exécuter cette commande:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
23
répondu Patrick 2015-06-14 20:02:28

mise à JOUR: Cette réponse est pertinente pour Kafka 0.6. Pour Kafka 0.8 et plus tard voir la réponse de @Patrick.

Oui, arrêter kafka et supprimer manuellement tous les fichiers du sous-répertoire correspondant (il est facile de le trouver dans le répertoire de données kafka). Après kafka redémarrer le sujet sera vide.

3
répondu Wildfire 2018-10-01 10:25:18

kafka n'ont pas de méthode directe pour la purge et le nettoyage des sujet (Files d'attente), mais peuvent le faire via la suppression de ce topic et de le recréer.

premier de assurez-vous sever.propriétés fichier a et si non Ajouter delete.topic.enable=true

ensuite, Supprimer sujet bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

puis le créer à nouveau.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
2
répondu Manish Jaiswal 2017-10-09 10:55:18

parfois, si vous avez un cluster saturé (trop de partitions, ou en utilisant des données de sujet cryptées, ou en utilisant SSL, ou le contrôleur est sur un mauvais noeud, ou la connexion est flasque, il faudra beaucoup de temps pour purger ledit sujet.

je suis ces étapes, en particulier si vous utilisez Avro.

1: Exécuter avec des outils kafka:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: Exécuter sur le Schéma nœud de registre:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: ramenez la rétention du sujet au cadre d'origine, une fois que le sujet est vide.

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

espérons que cela aide quelqu'un, car il n'est pas facilement annoncé.

2
répondu Ben Coughlan 2018-02-15 15:30:18

la méthode la plus simple consiste à établir que la date des fichiers journaux individuels est antérieure à la période de conservation. Ensuite, le courtier devrait les nettoyer et les enlever pour vous en quelques secondes. Cela offre plusieurs avantages:

  1. Pas besoin d'abattre des courtiers, c'est un moteur d'exécution de l'opération.
  2. évite la possibilité d'exceptions de compensation invalides (voir ci-dessous).

D'après mon expérience avec Kafka 0.7.x, supprimer les fichiers journaux et redémarrer le courtier pourrait conduire à des exceptions de offset invalides pour certains consommateurs. Cela se produirait parce que le courtier redémarre les offsets à zéro (en l'absence de tout fichier journal existant), et un consommateur qui consommait auparavant à partir du sujet se reconnecterait pour demander un offset spécifique [une fois valide]. Si cette compensation se trouve à l'extérieur des limites des nouvelles logs de sujet, alors aucun mal et le consommateur reprend soit au début ou le fin. Mais, si l'offset tombe à l'intérieur des limites des journaux de nouveaux sujets, le courtier tente de récupérer le jeu de messages mais échoue parce que l'offset ne s'aligne pas à un message réel.

cela pourrait être atténué en compensant également les compensations des consommateurs dans zookeeper pour ce sujet. Mais si vous n'avez pas besoin d'un sujet vierge et que vous voulez simplement supprimer le contenu existant, alors simplement "toucher"quelques logs de sujet est beaucoup plus facile et plus fiable, que l'arrêt des courtiers, la suppression de sujet des logs, et le nettoyage de certains noeuds de gardien de zoo.

1
répondu Andrew Carter 2014-06-06 20:09:22

le Conseil de Thomas est grand mais malheureusement zkCli dans les anciennes versions de Zookeeper (par exemple 3.3.6) ne semblent pas soutenir rmr . Par exemple, comparez l'implémentation en ligne de commande dans modern Zookeeper avec version 3.3 .

si vous êtes confronté à une ancienne version de Zookeeper une solution est d'utiliser une bibliothèque client comme zc.zk pour Python. Pour les personnes qui ne sont pas familières avec Python, vous devez l'installer en utilisant pip ou easy_install . Puis démarrez un shell Python ( python ) et vous pouvez faire:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

ou même

zk.delete_recursive('brokers')

si vous voulez supprimer tous les sujets de Kafka.

1
répondu Mark Butler 2015-10-15 00:32:51

pour nettoyer tous les messages d'un sujet particulier en utilisant votre groupe d'application (GroupName doit être le même que application Kafka nom du groupe).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

1
répondu user4713340 2017-11-30 20:24:45

ne pouvait pas ajouter de commentaire en raison de la taille: Je ne sais pas si c'est vrai, à part la mise à jour de la conservation.ms and retention.octets, mais j'ai remarqué sujet de nettoyage devrait être la politique de "supprimer" (par défaut), si "compact", il va tenir à des messages plus longs, c'est à dire, si il est "compact", vous devez spécifier supprimer".la rétention.ms aussi.

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

devait également surveiller les plus anciens/Plus récents décalages devraient être les mêmes pour confirmer cette réussi à est arrivé, pouvez également vérifier le du-h /tmp/kafka-journaux/test-sujet-3-100-*

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += } END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += } END {print sum}' 26599762

l'autre problème est, vous devez obtenir la config actuelle première donc vous vous rappelez de revenir en arrière après la suppression est réussie: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

0
répondu kisna 2016-06-14 00:02:30

une autre approche, plutôt manuelle, pour ce faire est:

dans les courtiers:

  1. stop kafka courtier

    sudo service kafka stop
  2. supprimer tous les fichiers journaux de partition (doit être fait sur tous les courtiers)

    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

dans zookeeper:

  1. exécuter interface de ligne de commande de zookeeper

    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. utilisez zkCli pour supprimer les métadonnées du sujet

    rmr /brokers/topic/<some_topic_name>

les courtiers de nouveau:

  1. service de courtier en redémarrage

    sudo service kafka start
0
répondu Danny Mor 2018-10-02 15:18:22