В этом тексте описаны общесистемные подходы, используемые при работе с брокером сообщений Kafka, и общие архитектурные подходы, используемые при работе с системами, имеющими несовместимые модели транзакций.
Преамбула или как это было «раньше»
Когда деревья были большими, а сервер оставили на улице, так как занести его в помещение не удалось из-за того, что с одной стороны не смогли разобрать его на более мелкие части, а с другой С другой стороны, в здании не было технологических проемов достаточных размеров, считалось, что требуемая производительность системы достигается за счет количества денег, затраченных на одну приобретенную единицу оборудования.Конечно, это не так – и тогда, и тогда было не так.
Хотя бы потому, что если в такой железке все операции зависят от одного ресурса, то конкуренция за этот ресурс остановит сервер любой мощности, как бы по команде «поезд, стоп, раз, два».
Не от хорошей жизни мне пришлось приобретать такие железки.
Но оставим лирику.
Мне кажется, сейчас есть один существенный момент, который изменился в понимании того, как решать подобные проблемы.
А именно, пришло четкое осознание того, что в большой системе в каждый момент времени частное состояние отдельных ее частей не обязательно должно быть целостным.
Но общее состояние системы в нужный момент должно быть полным.
Это общие слова, и их часто трудно сразу понять.
Поэтому более подробно.
Для связи компонентов внутри системы изначально использовались общие базы данных, которые использовались всеми компонентами и обменивались данными через общие области внутри этой базы данных.
По сути, они строили очереди с помощью таблиц.
Если нагрузка была небольшой, такой подход был вполне приемлем.
Но по мере роста нагрузки возникли проблемы, связанные с двумя, на мой взгляд, самыми существенными вещами: с одной стороны, стоимость оборудования под БД все время росла, а с другой, с ним было сложно работать.
несколько клиентов (средний уровень) с общими таблицами в этой базе, чтобы не мешать друг другу.
Последний пункт был связан в основном с тем, что из-за изоляции транзакций невозможно было определить, захвачена ли запись кем-то другим или нет (да, можно было пропустить заблокированные записи, но были и другие трудности с этим).
Следующим шагом в разработке стала реализация полноценных очередей.
Как правило, речь обычно идет о JMS. Иногда они были встроены в базу данных, иногда реализовывались независимым решением.
Общим для них было то, что они позволяли вносить изменения в базу данных и отправлять или обрабатывать сообщения в очереди с помощью одной транзакции.
То есть нам гарантировалось, что в любой момент после окончания транзакции мы либо получим изменения внутреннего состояния системы (БД) и обработаем/отправим сообщение (очередь), либо потеряем изменение и сообщение останется необработанным/ не отправлено.
То есть невозможно было, например, уловить момент времени, когда полученное сообщение перешло в состояние «обработано» и связанные с ним изменения в базе данных были потеряны.
Проблема была решена с помощью распределенных транзакций (XA/JTA) и протоколов двухфазной фиксации транзакций.
Все было круто, если не читать мелкий шрифт в конце.
На самом деле так называемая «задача двух генералов» строго не была решена.
Например, можно было получить ситуацию, когда сообщение было отправлено (и даже ответ на него уже был получен), но изменения в локальной базе данных еще не были внесены.
Так или иначе, эта проблема была решена, но решение в любом случае осталось дорогим и/или неполным.
Текст до текущего момента не удалось прочитать.
Сейчас.
.
они даже не пытаются получить гарантию того, что состояние определено в каждый момент времени.
Существует два основных сценария, которые необходимо поддерживать.
Здесь и ниже конкретно обсуждается брокер сообщений Kafka. Сразу стоит отметить, что есть третий вариант, наиболее приоритетный с точки зрения Кафки, — использование его как источника событий или реализация принципа CQRS. При таком подходе типичная транзакция затрагивает только темы Kafka и выглядит следующим образом:
- Сделка открыта.
- Тема, в которую в данный момент входят, начинает опрашиваться на наличие входящих сообщений.
- Как только они появляются, они отбираются и обрабатываются.
- Исходящие сообщения могут появиться в результате обработки.
Они прописаны в исходящую тему, но пока не видны потребителям.
- Отправляются смещения, которые указывают, что следующее сообщение, которое будет обработано, находится после обработанного.
Они тоже записаны, но пока не видны.
И их нельзя прочитать в текущей группе потребителей.
- Когда транзакция фиксируется, создается запись, указывающая, что сообщения и смещения, записанные до этого момента, становятся видимыми для всех потребителей.
Если фиксация не происходит, они остаются в очереди как бессмысленные записи, которые пропускаются и не участвуют в обработке.
Получение сообщения от Кафки
При получении сообщения от Kafka в рамках шага 3 (обработка) может возникнуть необходимость отразить изменения в СУБД.
В этой схеме взаимодействия в момент обработки сообщения в СУБД открывается транзакция и вносятся изменения, которые фиксируются в независимой транзакции, несовместимой с Kafka, то есть до завершения шага 6. Соответственно, у нас есть две проблемы:
- Изменения в базе данных отражаются до того момента, как сообщение можно считать обработанным.
- Изменения в базе данных могут произойти, но мы не успеем зафиксировать изменения в Kafka (вылеты приложения, закрытие приложения оркестратором контейнера, аппаратные сбои, сбой соединения и т. д. и т. п.
).
То есть надо всегда иметь в виду, что изменения в базе данных немного опережают состояние в очереди.
Чтобы сделать это «чуть-чуть» еще меньше, нужно сделать фиксацию изменений в базе данных максимально близкой по времени к фиксации изменений в очереди.
Решение второй проблемы — использование ключей идемпотентности.
Вкратце: на основе данных из обрабатываемого сообщения мы получаем или однозначно вычисляем значение, которое сохраняем вместе с изменениями, которые мы вносим на стороне базы данных.
Таким образом, если мы видим, что такое значение ключа идемпотентности уже связано с изменяемым объектом в базе данных, то это изменение уже было сделано.
Это указывает на то, что это не первый раз обработка сообщения, но что в предыдущий раз его обработка по каким-либо причинам была прервана.
То есть вы можете пропустить его в обычном режиме и перейти к следующему сообщению.
Для активно изменяющихся объектов в базе данных может накапливаться большое количество ключей идемпотентности.
Поэтому имеет смысл подумать о том, как их хранить и вычистить старые.
Возможные варианты: в составе полей объекта или в отдельной таблице, в текстовом поле или в структурированном поле, например, jsonb. Отдельная таблица, хотя и универсальная для всех информационных объектов, потребует организации дополнительных связей с ней и более сложного обслуживания, так как со временем она станет очень большой.
Поэтому имеет смысл хранить ключи как часть полей самого информационного объекта.
Для универсализации имеет смысл определить стандартное имя и тип поля.
Поскольку имеет смысл очищать старые ключи идемпотентности при добавлении нового ключа, с ключом должна быть связана какая-то дата, которую можно использовать для принятия решения о том, что ключ больше не действителен.
В случае с ULID у нас есть такая дата, но у нас нет гарантии, что во всех входящих сообщениях такой ULID будет доступен и его можно будет использовать.
Поэтому мы пока говорим о нескольких значениях.
Например, вы также можете сохранить информацию об ответе, который вы хотите предоставить вызывающему абоненту при повторных вызовах.
Для удобства хранения и работы с такими структурами (список пар ключевых дат), на мой вкус и цвет, удобнее использовать jsonb.
Отправка сообщения Кафке
Бывает и обратная ситуация, когда в СУБД открывается транзакция, в ходе которой необходимо отправить сообщение в Kafka. Используемые в предыдущем случае ключи идемпотентности мало помогают, если вложенная транзакция находится в Kafka, поскольку мы не можем искать в очереди старое сообщение и парсить его ключ.Очередь постоянно меняется, сообщения из нее могут быть принудительно вытеснены.
Кроме того, действие, вызвавшее транзакцию в СУБД, может быть инициировано пользователем и в конечном итоге не повторится.
Если отправить сообщение без учета того, как оно будет обработано при получении, включая возможные дубликаты, то возникнут проблемы.
Либо принимающая сторона должна учитывать возможность дубликатов.
Я знаю две правильные схемы реализации такого типа взаимодействия.
В обоих случаях внутри базы данных используется своего рода очередь, записи из которой становятся сообщениями в Kafka. Общая схема выглядит так:
В одном случае эта очередь организуется разработчиком, то есть создается соответствующая таблица.
Другой использует встроенную в СУБД очередь — так называемый журнал упреждающей записи или WAL. Принципиальные различия между этими подходами:
Стол | ВАЛ |
---|---|
Организовано разработчиком и является частным решением.
| Управляется СУБД и является решением класса CDC. |
Может содержать любую удобную структуру, включая данные из нескольких таблиц.
| Структурно это отражение каждой таблицы базы данных в отдельности.
|
Вам необходимо позаботиться о порядке записей в таблице.
| Сама СУБД ведет журнал |
Может содержать LOB и даже состоять из поля LOB. | Могут возникнуть проблемы с обработкой LOB |
Вам необходимо позаботиться об очистке таблицы от старых записей | Сама СУБД ведет журнал |
Работа разделена на два действия: первое заполняет очередь внутри базы данных, второе отвечает за чтение и очистку очереди.
Очередь заполняется при выполнении какой-либо операции в приложении или при обработке действия пользователя: открывается транзакция, выполняется логика обработки, записывается сообщение в таблицу очереди, фиксируется транзакция СУБД.
Сообщение может быть записано как в приложении, так и внутри СУБД с помощью триггера.
Запись может содержать все необходимые поля, а может быть LOB-полем, например, с json внутри.
Первый важный момент — записи внутри таблицы очереди должны быть упорядочены по времени появления.
В СУБД это делается довольно легко с помощью последовательности, записи текущей даты и времени или ключа в формате ULID, гарантирующего уникальность и включающего дату и время.
Второй важный момент — генерация идентификатора, время создания кортежа и запись в таблицу очереди должны происходить как можно позже (подробнее об этом позже).
В конце концов:
- Запись включает в себя 1) монотонно увеличивающийся идентификатор, 2) время, 3) поля для передачи в Kafka или одно большое поле со всеми необходимыми значениями.
- Транзакция внутри СУБД гарантирует нам, что изменения, вносимые в таблицы базы данных и таблицу очереди сообщений для отправки в Kafka, будут выполняться одновременно и целостно с точки зрения внешних пользователей СУБД.
Важно что:
- Максимальный идентификатор или временная метка, обработанные на предыдущем шаге, сохраняются в Kafka. То есть обновление этого значения будет выполняться в той же транзакции Kafka, что и запись сообщений в исходящую тему.
- Записи для обработки выбираются не только больше определенного идентификатора, но и меньше определенного времени, которое отводится для завершения всех транзакций в базе данных, начатых в пределах обрабатываемого временного интервала.
Если это условие не выполнено, то потенциально длинная транзакция может начаться в течение интервала времени, обрабатываемого приложением опросчика, и закончиться, когда обработка интервала будет завершена.
То есть запись останется невидимой для обработчика и останется необработанной.
Чтобы сузить этот интервал, генерация идентификатора (метки времени) и логирование должны происходить как можно позже, как уже писалось выше.
Идеальный вариант здесь — использовать ULID, поскольку он уже сочетает в себе и идентификатор, и временную метку.
На этом же этапе вы можете удалить старые записи из журнала внутри базы данных.
Например, те, которые меньше указанного выше идентификатора (за вычетом интервала хранения, если требуется).
Отправка в Kafka нескольких связанных сообщений, сгенерированных в разных транзакциях СУБД, очевидно, должна осуществляться аналогичным образом: сообщения накапливаются в промежуточных таблицах, а затем, когда они вообще готовы к отправке (формирование последнего сообщения в серии), они передаются в таблицу очередей, опрашиваемую приложением опросчика.
ом PS Кстати, определение последнего сообщения в серии — еще одна интересная задача как при работе с обычными СУБД, так и при работе с Kafka. Теги: #Микросервисы #Системный анализ и проектирование #kafka #rdbms #очередь #транзакция
-
Последние Курсы По Сетевым Технологиям
19 Oct, 24 -
Что Значит Быть Agile?
19 Oct, 24 -
Представители Оперы В Университетах
19 Oct, 24 -
Телефон Apple Больше Не Слухи
19 Oct, 24