Zokeeper & Kafka error KeeperErrorCode=NodeExists

j'ai écrit un kafkaconsumer et producer ça fonctionne très bien jusqu'à aujourd'hui. Ce matin, quand j'ai commencé zooekeeper et kafka, ma consommation n'a pas réussi à lire les messages et en Zookeeper log j'ai lu cette erreur

INFO Got user-level KeeperException when processing sessionid:0x151c41e62e10000 type:create cxid:0x2a zxid:0x1e txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)

Pourriez-vous m'aider? Qu'est-ce qui aurait pu changer en quelques jours? Je ne comprends pas. Je vous remercie beaucoup.

19
demandé sur Mrunal Pagnis 2015-12-21 13:45:18

2 réponses

j'ai eu cette erreur dans mon Kafka 2.11 tournant sous Windows 7. Je pense que cette exception n'est pas un problème puisque c'est seulement le niveau d'information. Juste assurez-vous que le courtier est toujours en cours d'exécution. Même avec cette erreur, je pourrais quand même:

  1. Créer une liste et d'un sujet kafka-topics.bat.
  2. Consumer a topic kafka-console-consumer.bat.
  3. programmer l'envoi d'un message producer.send(new ProducerRecord<String, String>("topic", "hello")).
0
répondu jpllosa 2016-07-27 11:05:35

dans mon cas, cela semble affecter la fonctionnalité puisque je ne peux pas consommer les messages. Voir le code ci-dessous

Vertx instance = VertxConfig.getInstance();

    Properties consumerConfig = new Properties();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

/ / consumerConfig.mettre(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, " earliest"); // consumerConfig.mettre(ConsumerConfig.ENABL_AUTO_COMMIT_CONFIG, "false");

    Properties producerConfig = new Properties();
    producerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerConfig.put("acks", "1");

    String topic = "dstv-queue-3";
    consumer = KafkaConsumer.create(instance, consumerConfig);
    producer = KafkaProducer.create(instance, producerConfig, String.class, String.class);
    consumer.subscribe(topic);

    instance.setPeriodic(2000, worker -> {
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, "message");
        producer.write(record, writeHandler -> {
            RecordMetadata metadata = writeHandler.result();

            //if meta data returned..
            if (metadata != null) {
                long offset = metadata.getOffset();
                int partition = metadata.getPartition();
                System.out.println("completed write: " + (writeHandler.succeeded() ? "successful" : "failed") + " offset:" + offset + " partition: " + partition);
            }
        });
    });

    AtomicLong counter = new AtomicLong();
    consumer.handler(readHandler -> System.out.println(counter.getAndAdd(1) + ". " + readHandler.value() + " was received"));
0
répondu Adeola Ojo 2018-03-10 06:05:01