Comment créer un sujet dans Kafka à travers Java

je veux créer un thème dans Kafka (kafka_2.8.0-0.8.1.1) par java. Cela fonctionne bien si je crée un sujet dans l'invite de commande, et si je pousse le message à travers l'api java. Mais je veux créer un sujet à travers l'api java. Après une longue recherche j'ai trouvé ci-dessous le code,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

j'ai essayé le code ci-dessus et il montre que le sujet est créé, mais je ne suis pas en mesure de pousser le message dans le sujet. Quelque chose de mauvais dans mon code? Ou tout autre moyen de réaliser ce qui précède?

30
demandé sur Jaya Ananthram 2014-11-20 13:12:53

4 réponses

Je l'ai réparé.. Après une longue recherche..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

à partir du code ci-dessus, ZkClient créera un sujet, mais cette information de sujet n'aura pas la conscience pour le kafka. Donc, ce que nous devons faire est, nous devons créer un objet pour ZkClient de la manière suivante,

première importation l'énoncé ci-dessous,

import kafka.utils.ZKStringSerializer$;

et créer l'objet pour ZkClient de la manière suivante,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Edit 1: (pour @ajkret commentaire)

le code ci-dessus ne fonctionnera pas pour kafka > 0.9 puisque l'api a été modifiée, Utilisez le code ci-dessous pour kafka > 0.9


import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

public class KafkaTopicCreationInJava
{
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            String topicName = "testTopic";
            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
}
37
répondu Jaya Ananthram 2016-05-13 05:22:46

Juste un pointeur à quiconque regarde cela avec une version mise à jour de Kafka (Au moment de la rédaction du présent, j'ai été en utilisant Kafka v0.10.0.0) .

il faut changer;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);

à ce qui suit;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);

C'est aussi une bonne idée de fermer la connexion une fois terminé;

zkClient.close();
7
répondu Richard G 2017-07-26 15:58:12

pour ceux qui tentent d'atteindre cet objectif dans kafka v0.10.2.1 et en cours d'exécution dans les questions avec l'erreur de sérialisation ' java.io.StreamCorruptedException: invalid stream header: 3139322E " ci-dessous est un exemple de code de travail avec les importations nécessaires.

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;

public static void createTopic(String topicName, int numPartitions, int numReplication) {
        ZkClient zkClient = null;
        ZkUtils zkUtils = null;
        try {
            String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
            //Ref: https://gist.github.com/jjkoshy/3842975
            zkClient.setZkSerializer(new ZkSerializer() {
                @Override
                public byte[] serialize(Object o) throws ZkMarshallingError {
                    return ZKStringSerializer.serialize(o);
                }

                @Override
                public Object deserialize(byte[] bytes) throws ZkMarshallingError {
                    return ZKStringSerializer.deserialize(bytes);
                }
            });

            zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);

            int noOfPartitions = 2;
            int noOfReplication = 3;
            Properties topicConfiguration = new Properties();

            AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
                    RackAwareMode.Enforced$.MODULE$);

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
    }
5
répondu Saurabh Mishra 2017-09-04 10:38:17

L'API Admin Utils devient obsolète. Il y a une nouvelle API AdminZkClient que nous pouvons utiliser pour gérer les sujets dans Kafka server.

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

vous pouvez renvoyer ce lien pour plus de détails: https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java /

3
répondu Mahesh Mogal 2018-07-02 18:13:17