Zokeeper & Kafka error KeeperErrorCode=NodeExists
j'ai écrit un kafka
consumer
et producer
ça fonctionne très bien jusqu'à aujourd'hui.
Ce matin, quand j'ai commencé zooekeeper
et kafka
, ma consommation n'a pas réussi à lire les messages et en Zookeeper log
j'ai lu cette erreur
INFO Got user-level KeeperException when processing sessionid:0x151c41e62e10000 type:create cxid:0x2a zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
Pourriez-vous m'aider? Qu'est-ce qui aurait pu changer en quelques jours? Je ne comprends pas. Je vous remercie beaucoup.
2 réponses
j'ai eu cette erreur dans mon Kafka 2.11 tournant sous Windows 7. Je pense que cette exception n'est pas un problème puisque c'est seulement le niveau d'information. Juste assurez-vous que le courtier est toujours en cours d'exécution. Même avec cette erreur, je pourrais quand même:
- Créer une liste et d'un sujet
kafka-topics.bat
. - Consumer a topic
kafka-console-consumer.bat
. - programmer l'envoi d'un message
producer.send(new ProducerRecord<String, String>("topic", "hello"))
.
dans mon cas, cela semble affecter la fonctionnalité puisque je ne peux pas consommer les messages. Voir le code ci-dessous
Vertx instance = VertxConfig.getInstance();
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
/ / consumerConfig.mettre(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, " earliest"); // consumerConfig.mettre(ConsumerConfig.ENABL_AUTO_COMMIT_CONFIG, "false");
Properties producerConfig = new Properties();
producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put("acks", "1");
String topic = "dstv-queue-3";
consumer = KafkaConsumer.create(instance, consumerConfig);
producer = KafkaProducer.create(instance, producerConfig, String.class, String.class);
consumer.subscribe(topic);
instance.setPeriodic(2000, worker -> {
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message");
producer.write(record, writeHandler -> {
RecordMetadata metadata = writeHandler.result();
//if meta data returned..
if (metadata != null) {
long offset = metadata.getOffset();
int partition = metadata.getPartition();
System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition);
}
});
});
AtomicLong counter = new AtomicLong();
consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received"));