Здесь я опишу пример сериализации данных через Avro и переноса в Kafka. У Avro есть сериализатор данных для Kafka; в своей работе он использует реестр схем и поддерживает управление версиями на отдельном развернутом сервере.
Будет только сериализатор, а при необходимости версионирование можно будет реализовать, например, в базе данных.
Проект на Github
Вот как могут выглядеть сериализованные данные, подготовленные Avro. Существует заголовок, описывающий данные, а затем сами данные.
Получается компактно и быстро, нет повторяющихся названий полей, формат данных двоичный.
Данные проверяются при добавлении согласно схеме данных.
Пример диаграммы:
Используя Spring Shell, первой командой добавляю персон в список, проверяя по схеме Avro:{"namespace": "avro", "type": "record", "name": "Person", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": ["int", "null"]} ] }
@ShellComponent
public class Commands {
private List<GenericRecord> records = new ArrayList<>();
@ShellMethod("add user to list for send")
public void add(String name, int age) {
GenericRecord record = new GenericData.Record(SchemaRepository.instance().
getSchemaObject());
record.put("name", name);
record.put("age", age);
records.add(record);
}
GenericRecord — запись Avro, которая формируется на основе схемы
public class SchemaRepository {
private static final String SCHEMA = "{\"namespace\": \"avro\",\n" +
"\"type\": \"record\",\n" +
"\"name\": \"Person\",\n" +
"\"fields\": [\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"age\", \"type\": [\"int\", \"null\"]}\n" +
"]\n" +
"}\n";
private static final Schema SCHEMA_OBJECT = new Schema.Parser().
parse(SCHEMA);
private static SchemaRepository INSTANCE = new SchemaRepository();
public static SchemaRepository instance() {
return INSTANCE;
}
public Schema getSchemaObject() {
return SCHEMA_OBJECT;
}
}
Добавляем людей в консоль оболочки и отправляем тему в Kafka:
@ShellComponent
public class Commands {
private List<GenericRecord> records = new ArrayList<>();
final private KafkaTemplate template;
public Commands(KafkaTemplate template) {
this.template = template;
}
@ShellMethod("send list users to Kafka")
public void send() {
template.setDefaultTopic("test");
template.sendDefault("1", records);
template.flush();
records.clear();
}
Вот сериализатор Avro для Kafka:
public class AvroGenericRecordSerializer implements Serializer<List<GenericRecord>> {
private Schema schema = null;
@Override public void configure(Map<String, ?> map, boolean b) {
schema = (Schema) map.get("SCHEMA");
}
@Override public byte[] serialize(String arg0, List<GenericRecord> records) {
byte[] retVal = null;
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
DataFileWriter dataFileWriter = new DataFileWriter<>(datumWriter);
try {
dataFileWriter.create(schema, outputStream);
for (GenericRecord record : records) {
dataFileWriter.append(record);
}
dataFileWriter.flush();
dataFileWriter.close();
retVal = outputStream.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return retVal;
}
@Override public void close() {
}
}
Конфигурация производителя Kafka:
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().
get(0));
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordSerializer");
props.put("SCHEMA", SchemaRepository.instance().
getSchemaObject());
return props;
}
Здесь указан класс сериализации — «com.example.model.AvroGenericRecordSerializer» а новый параметр «SCHEMA» — это объект схемы, он необходим в AvroGenericRecordSerializer при подготовке двоичных данных.
На принимающей стороне в консоли видим полученные данные:
Десериализатор Авро public class AvroGenericRecordDeserializer implements Deserializer {
private Schema schema = null;
@Override
public void configure(Map configs, boolean isKey) {
schema = (Schema) configs.get("SCHEMA");
}
@Override
public Object deserialize(String s, byte[] bytes) {
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
SeekableByteArrayInput arrayInput = new SeekableByteArrayInput(bytes);
List<GenericRecord> records = new ArrayList<>();
DataFileReader<GenericRecord> dataFileReader = null;
try {
dataFileReader = new DataFileReader<>(arrayInput, datumReader);
while (dataFileReader.hasNext()) {
GenericRecord record = dataFileReader.next();
records.add(record);
}
} catch (IOException e) {
e.printStackTrace();
}
return records;
}
}
И аналогичный потребитель Kafka: @Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers().
get(0)); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.example.model.AvroGenericRecordDeserializer"); props.put("SCHEMA", SchemaRepository.instance().
getSchemaObject());
return props;
}
Кафка используется из Docker вурстмайстер/кафка-докер , вы можете использовать любой другой Проект на Github avro.apache
Теги: #kafka #java #сериализация #Avro #синхронизация #дистрибутив
-
Так Много Удобства У Вас Под Рукой Онлайн
19 Oct, 24