Comment créer un sujet dans Kafka à partir de L'IDE en utilisant L'API

comment créer un sujet dans Kafka à partir de L'IDE en utilisant L'API parce que quand je fais cela:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181

j'obtiens l'erreur:

bash: bin/kafka-create-topic.sh: No such file or directory

et j'ai suivi la configuration du développeur telle qu'elle est.

43
demandé sur f_puras 2013-06-05 21:59:45

11 réponses

dans Kafka 0.8.1+ -- la dernière version de Kafka à partir d'aujourd'hui -- vous pouvez programmatiquement créer un nouveau sujet via AdminCommand . La fonctionnalité CreateTopicCommand (qui fait partie de L'ancienne Kafka 0.8.0) mentionnée dans l'une des réponses précédentes à cette question a été déplacée à AdminCommand .

Scala exemple pour Kafka 0.8.1:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
// createTopic() will only seem to work (it will return without error).  The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
    ZKStringSerializer)

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)

construire des dépendances, en utilisant sbt comme exemple:

libraryDependencies ++= Seq(
  "com.101tec" % "zkclient" % "0.4",
  "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri"),
  ...
)

EDIT: ajout D'un exemple Java pour Kafka 0.9.0.0 (dernière version en date de janvier 2016).

les dépendances Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

Code:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;
    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
    zkClient.close();
  }

}

EDIT 2: Ajout D'un exemple Java pour Kafka 0.10.2.0 (dernière version en avril 2017).

les dépendances Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
</dependency>

Code:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here

    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
    zkClient.close();
  }

}
67
répondu Michael G. Noll 2017-04-28 07:48:46

à partir de 0.11.0.0 tout ce dont vous avez besoin est:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

cet artefact contient maintenant le AdminClient ( org.apache.kafka.clients.admin ).

AdminClient peut gérer de nombreuses tâches admin Kafka, y compris la création de sujet:

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");

AdminClient admin = AdminClient.create(config);

Map<String, String> configs = new HashMap<>();
int partitions = 1;
int replication = 1;

admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));

La sortie de cette commande est un CreateTopicsResult , que vous pouvez utiliser pour obtenir un Future pour l'ensemble de l'opération ou pour chaque sujet de la création:

  • à obtenez un avenir pour l'ensemble de l'opération, utiliser CreateTopicsResult#all() .
  • pour obtenir Future s pour tous les sujets individuellement, utilisez CreateTopicsResult#values() .

par exemple:

CreateTopicsResult result = ...
KafkaFuture<Void> all = result.all();

ou:

CreateTopicsResult result = ...
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
    try {
        entry.getValue().get();
        log.info("topic {} created", entry.getKey());
    } catch (InterruptedException | ExecutionException e) {
        if (Throwables.getRootCause(e) instanceof TopicExistsException) {
            log.info("topic {} existed", entry.getKey());
        }
    }
}

KafkaFuture est "un futur flexible qui prend en charge l'enchaînement d'appels et d'autres modèles de programmation asynchrone, "et" deviendra éventuellement une fine pointe sur le dessus de Java 8 CompletebleFuture ."

26
répondu Dmitry Minkovsky 2017-07-17 14:33:31

Pour la création d'un sujet par le biais de l'api java et Kafka de 0,8+ essayez ce qui suit,

première importation sous déclaration

import kafka.utils.ZKStringSerializer$;

créer l'objet pour ZkClient de la manière suivante,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
13
répondu Jaya Ananthram 2015-03-06 05:20:28

vous pouvez essayer avec kafka.admin.CreateTopicCommand scala classe pour créer un sujet à partir du code Java...providng les arguments nécessaires.

String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";

CreateTopicCommand.main(arguments);

NB: vous devez ajouter les dépendances de maven pour jopt-simple-4.5 & zkclient-0.1

10
répondu Biks 2016-08-08 09:50:01

si vous utilisez Kafka 0.10.0.0+, la création de sujet à partir de Java nécessite un paramètre de passage de type RackAwareMode. C'est un objet Scala case, et obtenir son instance depuis Java est délicat (preuve: Comment "obtenir" un objet Scala case depuis Java? par exemple. Mais elle ne s'applique pas à notre cas).

heureusement, rackAwareMode est un paramètre optionnel. Cependant, Java ne supporte pas les paramètres optionnels. Comment pouvons-nous résoudre ce problème? Voici une solution:

AdminUtils.createTopic(zkUtils, topic, 1, 1, 
    AdminUtils.createTopic$default(),
    AdminUtils.createTopic$default());

utilisez-le avec la réponse de miguno, et vous êtes prêt à partir.

2
répondu Dmitriusan 2017-05-23 12:10:43

de plusieurs façons votre appel ne marcherait pas.

  1. si votre cluster Kafka n'avait pas assez de nœuds pour supporter une valeur de réplication de 3.

  2. S'il y a un préfixe de chemin de chroot vous devez l'ajouter après le port de zookeeper

  3. vous êtes dans le répertoire d'installation de Kafka lorsque vous lancez (c'est le plus probable)

1
répondu Gregory Patmore 2013-12-19 23:16:51

De Kafka 0.8 Producteur Exemple l'exemple ci-dessous crée un topic nommé page_visits et aussi commencer à produire si le auto.create.topics.enable attribut est défini sur true (par défaut) dans la Kafka Courtier en config fichier

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) { 
            long runtime = new Date().getTime();  
            String ip = “192.168.2.” + rnd.nextInt(255); 
            String msg = runtime + “,www.example.com,” + ip; 
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            producer.send(data);
        }
        producer.close();
   }
}
1
répondu Hild 2016-08-08 09:50:16

thème création en scala. Si vous utilisez le mode seudo dans le fichier de configuration broker

auto.créer.sujet.enable= true

il activera la création automatique du sujet sur le serveur. Si cette option est définie à true, alors les tentatives de produire, de consommer ou de récupérer des métadonnées pour un sujet inexistant le créeront automatiquement avec le facteur de réplication par défaut et le nombre de partitions.

ci-dessous code.

import scala.util.Random
import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date

class SimpleProducer {


  def sendmessages(){
  val rnd = new Random();

  val props = new Properties();  
  props.put("metadata.broker.list", "192.1.1.1:6667"); 
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  //props.put("partitioner.class", "rtbi.dis.producers.SimplePartitioner")

  val config = new ProducerConfig(props);
  val producer = new Producer[String, String](config);
    for (event<-1 to 5000) { 
               val runtime = new Date().getTime;  
               val ip = "192.1.1.1" + rnd.nextInt(255); 
               val msg = runtime + ",www.example.com," + ip; 
               val data = new KeyedMessage[String, String]("mytopic", ip, msg); //here mytopic is a topic 
               producer.send(data);
        }
        producer.close();
  }
 }
  object SimpleProducer extends App{   
    val s= new SimpleProducer().sendmessages();
  }
0
répondu madhu 2016-02-22 17:19:12

de quelle IDE essayez-vous ?

Veuillez fournir le chemin d'accès complet , ci-dessous sont la commande de terminal qui permettra de créer une rubrique

  1. cd kafka/bin
  2. ./kafka-create-topic.sh --topic test --zookeeper localhost:2181
0
répondu Sanket 2016-08-08 09:51:04

il y a une nouvelle API AdminZkClient que nous pouvons utiliser pour gérer les sujets dans Kafka server.

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

vous pouvez renvoyer ce lien pour plus de détails https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java /

0
répondu Mahesh Mogal 2018-07-02 18:18:20

à partir de Kafka 0.10.1 le ZKStringSerializer mentionné par Michael est privé (pour Scala). Vous pouvez utiliser les méthodes d'usine createZkClient ou createZkClientAndConnection dans ZkUtils.

Scala exemple pour Kafka 0.10.1:

import kafka.utils.ZkUtils

val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(
  "localhost:2181", sessionTimeoutMs, connectionTimeoutMs) 

alors créez simplement le sujet comme Michael a suggéré:

import kafka.admin.AdminUtils

val zkUtils = new ZkUtils(zkClient, zkConnection, false)
val numPartitions = 4
val replicationFactor = 1
val topicConfig = new Properties
val topic = "my-topic"
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
-1
répondu its_a_paddo 2017-06-20 09:05:02