Comment obtenir des données à partir de l'ancien offset point à Kafka?
j'utilise zookeeper pour obtenir des données de kafka. Et ici je reçois toujours des données du dernier point de décalage. Est-il possible de spécifier le temps de décalage pour récupérer les anciennes données?
il y a une option autooffset.réinitialiser. Il accepte les plus petits ou les plus grands. Quelqu'un peut-il expliquer ce qu'est le plus petit et le plus grand. Can autooffset.réinitialiser aide à obtenir des données à partir de l'ancien point d'offset au lieu du dernier point d'offset?
7 réponses
les consommateurs appartiennent toujours à un groupe et, pour chaque partition, le gardien de Zoo garde trace de la progression de ce groupe de consommateurs dans la partition.
pour récupérer depuis le début, vous pouvez supprimer toutes les données associées à progress comme Hussain fait référence
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
vous pouvez également spécifier le décalage de la partition que vous voulez, comme spécifié dans le fichier core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
cependant le décalage n'est pas indexé dans le temps, mais vous savez pour chaque partition est une séquence.
si votre message contient un timestamp (et attention que ce timestamp n'a rien à voir avec le moment où Kafka a reçu votre message), vous pouvez essayer de faire un indexeur qui tente de récupérer une entrée en incrémentant l'offset par N, et stocker le tuple (topic X, part 2, offset 100, timestamp) quelque part.
quand vous voulez récupérer des entrées à partir d'un moment précis, vous pouvez appliquer une recherche binaire à votre index approximatif jusqu'à ce que vous trouviez l'entrée que vous voulez et récupérez à partir de là.
de la Kafka documentation ils disent "Kafka.API.OffsetRequest.EarliestTime () trouve le début des données dans les logs et commence à diffuser à partir de là, kafka.API.OffsetRequest.LatestTime () ne diffusera que les nouveaux messages. Ne présumez pas que l'offset 0 est l'offset du début, puisque les messages vieillissent avec le temps. "
utilisez L'exemple Simpleconsumérexici: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Simpleconsommateur+exemple
question Similaire: Kafka Niveau Élevé de Consommation Récupérer Tous les Messages De Rubrique à l'Aide de l'API Java (Équivalent à --à partir de début)
Cela pourrait
Refer the doc about kafka config : http://kafka.apache.org/08/configuration.html pour votre requête sur les valeurs les plus petites et les plus grandes du paramètre offset.
BTW, en explorant kafka, je me demandais comment rejouer tous les messages pour un consommateur. Je veux dire, si un groupe de consommateurs a sondé tous les messages et qu'il veut les récupérer.
la façon dont il peut être réalisé est de supprimer des données de zookeeper. L'utilisation de la Kafka.utils.Classe ZkUtils pour supprimer un noeud sur zookeeper. Voici son usage:
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
Pour L'Instant
Kafka FAQ donner une réponse à ce problème.
Comment puis-je obtenir avec précision des offsets de messages pour un certain timestamp en utilisant OffsetRequest?
Kafka permet d'interroger des offsets de messages par le temps et il le fait à la granularité de segment. Le paramètre timestamp est le timestamp unix et le fait de désactiver l'offset par timestamp renvoie le dernier offset possible du message qui est ajouté au plus tard à l'horodatage donné. Il y a 2 valeurs spéciales de l'horodatage - le plus récent et le plus ancien. Pour toute autre valeur de l'horodatage unix, Kafka obtiendra l'offset de départ du segment log qui est créé au plus tard à la date de l'horodatage donné. Pour cette raison, et puisque la requête offset n'est servie qu'à la granularité du segment, la requête offset fetch renvoie des résultats moins précis pour des segments de plus grande taille.
Pour des résultats plus précis, vous peut configurer la taille du segment log en fonction du temps (log.roule.ms) au lieu de taille (log.segment.octet.) Toutefois, il faut faire preuve de prudence, car cela pourrait augmenter le nombre de gestionnaires de fichiers en raison du roulement fréquent du segment de billes.
Plan Futur
Kafka ajoutera timestamp au format du message. Se référer à
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+enrichi+Message+métadonnées
Kafka Protocol Doc est une excellente source pour jouer avec la demande/réponse/Offsets / Messages: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+De+La+Kafka+Protocole vous utilisez un exemple de consommateur Simple comme où le code suivant démontre l'état:
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = simpleConsumer.fetch(req);
compensation readOffset pour démarrer l'offset initial à partir de. mais vous devez vérifier l'offset max ainsi que ci-dessus fournira des offsets limités compter comme FetchSize dans le dernier param de méthode addFetch.
en utilisant le KafkaConsumer vous pouvez utiliser Seek, SeekToBeginning et SeekToEnd pour se déplacer dans le ruisseau.
aussi, si aucune partition n'est fournie, elle cherchera à obtenir le premier décalage pour toutes les partitions actuellement attribuées.
avez-vous essayé?
bin/kafka-console-consumer.sh --bootstrap-serveur localhost:9092 --rubrique test, à partir de début
il imprimerait tous les messages pour le sujet donné," test " dans cet exemple.
plus de détails à partir de ce lien https://kafka.apache.org/quickstart