Синхронизация Базы Данных Между Монолитом И Микросервисами С Помощью Kafka. Наше Решение

В этой статье я расскажу о готовом решении для поддержания согласованности данных между растущим микросервисом и устаревшей архитектурой.

Ниже приведен код репликации двух баз данных с проверкой синхронизации, который может пригодиться для решения подобных задач.



Синхронизация базы данных между монолитом и микросервисами с помощью Kafka. Наше решение

Мы столкнулись с проблемой согласованности данных при разработке микросервиса Profile. Он отвечает за регистрацию новых пользователей, хранение данных о них и синхронизацию с монолитной базой данных.

Именно в синхронизации двух баз данных возникло несколько проблем.



Синхронизация данных

Для стабильной работы системы при изменении данных в одной базе изменения должны автоматически отражаться в другой.

Для этого мы создали в Kafka очередь таких изменений.

Мы начали со студенческого стола.

К основным столбцам с именами, фамилиями и классами добавлены дополнительные — ревизия и Foreign_revision — для работы с очередью.

Теперь при добавлении или изменении данных в postgres вызывается триггер, который записывает текущее время в поле ревизии с точностью до миллисекунды.

Вот код для добавления столбцов и триггера:

  
  
  
  
  
   

ALTER TABLE "students" ADD "revision" timestamp(6) ALTER TABLE "students" ALTER COLUMN "revision" SET DEFAULT timezone('utc', now()); ALTER TABLE "students" ADD "foreign_revision" timestamp(6) CREATE OR REPLACE FUNCTION increase_revision() RETURNS trigger AS $$ BEGIN NEW.revision := timezone('utc', now()); RETURN NEW; END $$ LANGUAGE PLPGSQL; CREATE TRIGGER update_revision BEFORE UPDATE ON students FOR EACH ROW WHEN (old.foreign_revision is not distinct FROM new.foreign_revision and row_to_json(old)::jsonb - 'revision' is distinct FROM row_to_json(new)::jsonb - 'revision') EXECUTE PROCEDURE increase_revision();

После добавления студента или изменения его личных данных мы генерируем и отправляем сообщение в Kafka. Однако если вы отправите такие сообщения до закрытия транзакции, пострадает база данных: соединения прервутся, а транзакция будет откачена из-за сетевой ошибки.

Чтобы этого не произошло, мы использовали в модели after_commit:

after_commit :push_to_exchange, on: [:create, :update]

Служба профилей подписывается на общую очередь в Kafka и либо обновляет существующую запись в таблице, либо добавляет новую.



class StudentConsumer def consume(payload, metadata) if record = Student.where(id: payload.id).

first record.update!(params(payload)) else Student.create!(params(payload)) end end def params(payload) hash = payload.to_h hash[:foreign_revision] = hash[:revision] hash.slice(*Student.column_names.map(&:to_sym)) end end

Таким образом, мы добились согласованности данных в двух разных базах данных.

Процесс состоит из четырех этапов:

  • Добавьте или обновите ученика в монолите.

  • Триггер помещает текущее время в поле редакции.

  • Отправляем сообщение Кафке.

  • Получаем сообщение и сохраняем данные студента в базе данных сервиса профилей.

Этот алгоритм работает хорошо, пока не возникнут проблемы: сеть падает, появляются массовые изменения через update_all или единичные изменения через update_column, Kafka не будет работать или будет работать медленно.

Чтобы все это не влияло на процесс синхронизации, монолит тоже подписывается на эту очередь и записывает прочитанную из Kafka ревизию в поле Foreign_revision:

class StudentConsumer def consume(payload, metadata) Student.where(id: payload.id).

update_all(foreign_revision: payload.revision) end end

Каждые пять минут в монолите запускается воркёр, который находит все строки, у которых поля ревизий не совпадают, и повторно отправляет их в Kafka:

module Profile::SyncShareable def run Student.where("foreign_revision is null or revision != foreign_revision").

where("revision < ?", Time.now - 1.minute).

order(revision: :desc).

limit(5000).

each(&:push_to_exchange) end end

Чтобы ускорить этот процесс, необходим условный индекс.

Он будет небольшим по размеру, поскольку большинство записей редакций будут иметь одинаковый:

CREATE INDEX "index_studends_on_revision" ON "students" ("revision") WHERE revision <> foreign_revision

Таким образом, актуальная информация обо всех студентах стала доступна для чтения в сервисе «Профиль».

Однако для изменения данных мы были вынуждены обратиться к монолитному API. Чтобы внести изменения непосредственно в Профиль, мы подумали о двусторонней синхронизации.



Двусторонняя синхронизация

Двустороннюю синхронизацию можно обеспечить путем зеркалирования всего предыдущего кода в службе профилей.

Но придется решить несколько проблем.



1. Создайте уникальный идентификатор

Мы не можем создать нового учащегося в профиле, если в монолите используется числовой идентификатор.

Переключение на UUID в нижнем регистре вместо числового приращения решит проблему.



2. Синхронизация занимает значительное количество времени

Проблема в том, что данные могут обновляться одновременно в двух местах.

Например, если за 48 секунд поменялось имя в монолите, а за 49 секунд фамилия в Профиле.

Теоретически это возможно с помощью исправлений, дополнений и автоматической коррекции.

Обмен сообщениями через Kafka может занять больше трех секунд, в этом случае смена имени будет потеряна из-за более новой версии данных с обновленной фамилией.

Чтобы этого не происходило при двусторонней синхронизации, можно заменить Kafka на что-нибудь более быстрое, например RabbitMQ. Но Kafka хранит журнал транзакций, и мы можем вернуться назад, проанализировать нашу синхронизацию и переработать транзакции в случае аварии.

Кроме того, он может работать с двумя разными дата-центрами.

Для нас это было важно, и мы остались с Кафкой.

Хотя для кого-то более актуальным может быть быстрый Rabbit, у которого синхронизация происходит за миллисекунды, а количество воркеров можно регулировать динамически.



3. Асинхронная синхронизация

Когда мы записываем изменения в Профиль, нет никакой гарантии, что мы прочитаем их в монолите — данные синхронизируются с задержкой.

Это необходимо учитывать, когда разные части приложения пишутся поверх разных сервисов.

В таких местах приходится отказываться от двусторонней синхронизации и переходить на синхронное взаимодействие через REST API или gRPC. Таким образом мы можем распределить нагрузку между монолитом и отдельным сервисом, улучшить кодовую базу и развернуть этот сервис независимо от монолита.

UPD: архитектура решения примерно такая.



Синхронизация базы данных между монолитом и микросервисами с помощью Kafka. Наше решение

*** Как вы решили проблему согласованности данных в микросервисной архитектуре? Какой у вас был опыт бесшовной распиловки монолита? Теги: #Администрирование баз данных #Микросервисы #kafka #код #ruby #ruby onrails #согласованность #синхронизация #синхронизация базы данных

Вместе с данным постом часто просматривают:

Автор Статьи


Зарегистрирован: 2019-12-10 15:07:06
Баллов опыта: 0
Всего постов на сайте: 0
Всего комментарий на сайте: 0
Dima Manisha

Dima Manisha

Эксперт Wmlog. Профессиональный веб-мастер, SEO-специалист, дизайнер, маркетолог и интернет-предприниматель.