Comment coder / décoder des messages Kafka en utilisant Avro binaire encoder?

j'essaie D'utiliser Avro pour les messages lus/écrits à Kafka. Quelqu'un a-t-il un exemple d'utilisation de l'encodeur binaire Avro pour encoder/décoder des données qui seront placées dans une file d'attente de messages?

j'ai plus besoin de la partie Avro que de la partie Kafka. Ou, peut-être devrais-je envisager une autre solution? En gros, j'essaie de trouver une solution plus efficace pour JSON en ce qui concerne l'espace. Avro vient d'être mentionné car il peut être plus compact que JSON.

24
demandé sur Jacek Laskowski 2011-11-28 19:40:49

5 réponses

C'est un exemple de base. Je n'ai pas essayé avec plusieurs partitions/sujets.

//Code du producteur de l'échantillon

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;


public class ProducerTest {

    void producer(Schema schema) throws IOException {

        Properties props = new Properties();
        props.put("metadata.broker.list", "0:9092");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);
        GenericRecord payload1 = new GenericData.Record(schema);
        //Step2 : Put data in that genericrecord object
        payload1.put("desc", "'testdata'");
        //payload1.put("name", "अasa");
        payload1.put("name", "dbevent1");
        payload1.put("id", 111);
        System.out.println("Original Message : "+ payload1);
        //Step3 : Serialize the object to a bytearray
        DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload1, encoder);
        encoder.flush();
        out.close();

        byte[] serializedBytes = out.toByteArray();
        System.out.println("Sending message in bytes : " + serializedBytes);
        //String serializedHex = Hex.encodeHexString(serializedBytes);
        //System.out.println("Serialized Hex String : " + serializedHex);
        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes);
        producer.send(message);
        producer.close();

    }


    public static void main(String[] args) throws IOException, DecoderException {
        ProducerTest test = new ProducerTest();
        Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
        test.producer(schema);
    }
}

//exemple de code de consommateur

Partie 1 : Code de groupe de consommateurs: comme vous pouvez avoir plus de plusieurs consommateurs pour plusieurs partitions/ sujets.

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by  on 9/1/15.
 */
public class ConsumerGroupExample {
   private final ConsumerConnector consumer;
   private final String topic;
   private ExecutorService executor;

   public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){
      consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
              createConsumerConfig(a_zookeeper, a_groupId));
      this.topic = a_topic;
   }

   private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){
       Properties props = new Properties();
       props.put("zookeeper.connect", a_zookeeper);
       props.put("group.id", a_groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");

       return new ConsumerConfig(props);
   }

    public void shutdown(){
         if (consumer!=null) consumer.shutdown();
        if (executor!=null) executor.shutdown();
        System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
        try{
          if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){

          }
        }catch(InterruptedException e){
            System.out.println("Interrupted");
        }

    }


    public void run(int a_numThreads){
        //Make a map of topic as key and no. of threads for that topic
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        //Create message streams for each topic
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        //initialize thread pool
        executor = Executors.newFixedThreadPool(a_numThreads);
        //start consuming from thread
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
        example.shutdown();
    }


}

Partie 2 : Individu consommateur qui consomme réellement les messages.

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.commons.codec.binary.Hex;

import java.io.File;
import java.io.IOException;

public class ConsumerTest implements Runnable{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run(){
        ConsumerIterator<byte[], byte[]>it = m_stream.iterator();
        while(it.hasNext())
        {
            try {
                //System.out.println("Encoded Message received : " + message_received);
                //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray());
                //System.out.println("Deserializied Byte array : " + input);
                byte[] received_message = it.next().message();
                System.out.println(received_message);
                Schema schema = null;
                schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));
                DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null);
                GenericRecord payload2 = null;
                payload2 = reader.read(null, decoder);
                System.out.println("Message received : " + payload2);
            }catch (Exception e) {
                e.printStackTrace();
                System.out.println(e);
            }
        }

    }


}

Tester l'AVRO schéma :

{
    "namespace": "xyz.test",
     "type": "record",
     "name": "payload",
     "fields":[
         {
            "name": "name", "type": "string"
         },
         {
            "name": "id",  "type": ["int", "null"]
         },
         {
            "name": "desc", "type": ["string", "null"]
         }
     ]
}

les choses importantes à noter sont :

  1. vous aurez besoin du kafka standard et des bocaux avro pour sortir ce code de la boîte.

  2. est un accessoire très important.mettre ("serializer.la classe", "kafka.sérialiseur.DefaultEncoder"); Dont use stringEncoder as that won t fonctionne si vous envoyez un tableau d'octets comme message.

  3. vous pouvez convertir le octet[] en chaîne de hexagones et envoyer cela et sur le consommateur reconvertir chaîne de hexagones en octet [] et puis à l'original message.

  4. Exécuter la gardienne d'animaux et le courtier comme indiqué ici :- http://kafka.apache.org/documentation.html#quickstart et créer un topic appelé "page_views" ou ce que vous voulez.

  5. lancez le ProducerTest.java et ensuite le ConsumerGroupExample.java et de voir l'avro données produites et consommées.

13
répondu ramu 2016-03-08 18:28:44

je me suis finalement souvenu de demander à la liste de diffusion Kafka et j'ai obtenu la réponse suivante, qui a fonctionné parfaitement.

Oui, vous pouvez envoyer des messages comme des tableaux d'octets. Si vous regardez le constructeur de la classe de Message, vous verrez -

def ce(octets: Array[Byte])

maintenant, en regardant L'API Producer send () -

def send (producerData: ProducerData[K, V]*)

vous pouvez définir V comme étant un Message de type et K pour ce que vous voulez de votre clé. Si vous ne vous souciez pas de partitionner en utilisant une clé, puis réglez cela à Message type de bien.

Merci, Neha

11
répondu blockcipher 2011-12-01 21:03:07

si vous voulez obtenir un tableau d'octets à partir D'un message Avro (la partie kafka est déjà répondue), utilisez l'encodeur binaire:

    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 
    try {
        Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
        writer.write(record, e); 
        e.flush(); 
        byte[] byteData = os.toByteArray(); 
    } finally {
        os.close(); 
    }
7
répondu Will Sargent 2014-07-22 01:48:31

Réponse Mise À Jour.

Kafka a un serializer/deserializer Avro avec des coordonnées Maven (format SBT):

  "io.confluent" % "kafka-avro-serializer" % "3.0.0"

vous passez une instance de KafkaAvroSerializer dans le constructeur de KafkaProducer.

alors vous pouvez créer des instances Avro GenericRecord, et les utiliser comme valeurs à l'intérieur des instances Kafka ProducerRecord que vous pouvez envoyer avec KafkaProducer.

du côté des consommateurs de Kafka, vous utilisez KafkaAvroDeserializer et KafkaConsumer.

3
répondu clay 2016-06-09 03:55:47

au lieu D'Avro, vous pourriez aussi simplement envisager de compresser des données, soit avec gzip (bonne compression, cpu plus élevé) ou LZF ou Snappy (compression beaucoup plus rapide, un peu plus lente).

ou bien il y a aussi Smile binary JSON, supporté en Java par Jackson (avec cette extension): il est compact format binaire, et beaucoup plus facile à utiliser que Avro:

ObjectMapper mapper = new ObjectMapper(new SmileFactory());
byte[] serialized = mapper.writeValueAsBytes(pojo);
// or back
SomeType pojo = mapper.readValue(serialized, SomeType.class);

essentiellement le même code qu'avec JSON, sauf pour passer un format différent usine. Du point de vue de la taille des données, si Smile ou Avro est plus compact dépend des détails du cas d'utilisation; mais les deux sont plus compact que JSON.

avantage il y a que cela fonctionne rapidement avec JSON et Smile, avec le même code, en utilisant juste POJOs. Comparé à Avro qui nécessite soit la génération de code, soit beaucoup de code manuel pour emballer et déballer GenericRecord S.

2
répondu StaxMan 2012-04-25 20:45:12