Евро-Сериализация В Kafka

Здесь я опишу пример сериализации данных через Avro и переноса в Kafka. У Avro есть сериализатор данных для Kafka; в своей работе он использует реестр схем и поддерживает управление версиями на отдельном развернутом сервере.

Будет только сериализатор, а при необходимости версионирование можно будет реализовать, например, в базе данных.

Проект на Github

Евро-сериализация в Kafka

Вот как могут выглядеть сериализованные данные, подготовленные Avro. Существует заголовок, описывающий данные, а затем сами данные.

Получается компактно и быстро, нет повторяющихся названий полей, формат данных двоичный.

Данные проверяются при добавлении согласно схеме данных.

Пример диаграммы:

  
  
  
  
  
  
  
   

{"namespace": "avro", "type": "record", "name": "Person", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": ["int", "null"]} ] }

Используя Spring Shell, первой командой добавляю персон в список, проверяя по схеме Avro:

@ShellComponent public class Commands { private List<GenericRecord> records = new ArrayList<>(); @ShellMethod("add user to list for send") public void add(String name, int age) { GenericRecord record = new GenericData.Record(SchemaRepository.instance().

getSchemaObject()); record.put("name", name); record.put("age", age); records.add(record); }

GenericRecord — запись Avro, которая формируется на основе схемы

public class SchemaRepository { private static final String SCHEMA = "{\"namespace\": \"avro\",\n" + "\"type\": \"record\",\n" + "\"name\": \"Person\",\n" + "\"fields\": [\n" + " {\"name\": \"name\", \"type\": \"string\"},\n" + " {\"name\": \"age\", \"type\": [\"int\", \"null\"]}\n" + "]\n" + "}\n"; private static final Schema SCHEMA_OBJECT = new Schema.Parser().

parse(SCHEMA); private static SchemaRepository INSTANCE = new SchemaRepository(); public static SchemaRepository instance() { return INSTANCE; } public Schema getSchemaObject() { return SCHEMA_OBJECT; } }

Добавляем людей в консоль оболочки и отправляем тему в Kafka:

Евро-сериализация в Kafka



@ShellComponent public class Commands { private List<GenericRecord> records = new ArrayList<>(); final private KafkaTemplate template; public Commands(KafkaTemplate template) { this.template = template; } @ShellMethod("send list users to Kafka") public void send() { template.setDefaultTopic("test"); template.sendDefault("1", records); template.flush(); records.clear(); }

Вот сериализатор Avro для Kafka:

public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> { private Schema schema = null; @Override public void configure(Map<String, ?> map, boolean b) { schema = (Schema) map.get("SCHEMA"); } @Override public byte[] serialize(String arg0, List<GenericRecord> records) { byte[] retVal = null; ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter); try { dataFileWriter.create(schema, outputStream); for (GenericRecord record : records) { dataFileWriter.append(record); } dataFileWriter.flush(); dataFileWriter.close(); retVal = outputStream.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return retVal; } @Override public void close() { } }

Конфигурация производителя Kafka:

@Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().

get(0)); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer"); props.put("SCHEMA", SchemaRepository.instance().

getSchemaObject()); return props; }

Здесь указан класс сериализации — «com.example.model.AvroGenericRecordSerializer» а новый параметр «SCHEMA» — это объект схемы, он необходим в AvroGenericRecordSerializer при подготовке двоичных данных.

На принимающей стороне в консоли видим полученные данные:

Евро-сериализация в Kafka

Десериализатор Авро

public class AvroGenericRecordDeserializer implements Deserializer { private Schema schema = null; @Override public void configure(Map configs, boolean isKey) { schema = (Schema) configs.get("SCHEMA"); } @Override public Object deserialize(String s, byte[] bytes) { DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes); List<GenericRecord> records = new ArrayList<>(); DataFileReader<GenericRecord> dataFileReader = null; try { dataFileReader = new DataFileReader<>(arrayInput, datumReader); while (dataFileReader.hasNext()) { GenericRecord record = dataFileReader.next(); records.add(record); } } catch (IOException e) { e.printStackTrace(); } return records; } }

И аналогичный потребитель Kafka:

@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().

get(0)); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer"); props.put("SCHEMA", SchemaRepository.instance().

getSchemaObject()); return props; }

Кафка используется из Docker вурстмайстер/кафка-докер , вы можете использовать любой другой Проект на Github avro.apache Теги: #kafka #java #сериализация #Avro #синхронизация #дистрибутив

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.