Comment créer un sujet dans Kafka à travers Java
je veux créer un thème dans Kafka (kafka_2.8.0-0.8.1.1) par java. Cela fonctionne bien si je crée un sujet dans l'invite de commande, et si je pousse le message à travers l'api java. Mais je veux créer un sujet à travers l'api java. Après une longue recherche j'ai trouvé ci-dessous le code,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
j'ai essayé le code ci-dessus et il montre que le sujet est créé, mais je ne suis pas en mesure de pousser le message dans le sujet. Quelque chose de mauvais dans mon code? Ou tout autre moyen de réaliser ce qui précède?
4 réponses
Je l'ai réparé.. Après une longue recherche..
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
à partir du code ci-dessus, ZkClient créera un sujet, mais cette information de sujet n'aura pas la conscience pour le kafka. Donc, ce que nous devons faire est, nous devons créer un objet pour ZkClient de la manière suivante,
première importation l'énoncé ci-dessous,
import kafka.utils.ZKStringSerializer$;
et créer l'objet pour ZkClient de la manière suivante,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
Edit 1: (pour @ajkret commentaire)
le code ci-dessus ne fonctionnera pas pour kafka > 0.9 puisque l'api a été modifiée, Utilisez le code ci-dessous pour kafka > 0.9
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
public class KafkaTopicCreationInJava
{
public static void main(String[] args) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
}
Juste un pointeur à quiconque regarde cela avec une version mise à jour de Kafka (Au moment de la rédaction du présent, j'ai été en utilisant Kafka v0.10.0.0) .
il faut changer;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);
à ce qui suit;
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);
C'est aussi une bonne idée de fermer la connexion une fois terminé;
zkClient.close();
pour ceux qui tentent d'atteindre cet objectif dans kafka v0.10.2.1 et en cours d'exécution dans les questions avec l'erreur de sérialisation ' java.io.StreamCorruptedException: invalid stream header: 3139322E
" ci-dessous est un exemple de code de travail avec les importations nécessaires.
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
public static void createTopic(String topicName, int numPartitions, int numReplication) {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs);
//Ref: https://gist.github.com/jjkoshy/3842975
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return ZKStringSerializer.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(bytes);
}
});
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
int noOfPartitions = 2;
int noOfReplication = 3;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration,
RackAwareMode.Enforced$.MODULE$);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
L'API Admin Utils devient obsolète. Il y a une nouvelle API AdminZkClient que nous pouvons utiliser pour gérer les sujets dans Kafka server.
String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();
adminZkClient.createTopic(topicName1,partitions,replication,
topicConfig,RackAwareMode.Disabled$.MODULE$);
vous pouvez renvoyer ce lien pour plus de détails: https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java /