Некоторое время назад мы столкнулись с проблемой очистки кортежей в спейсах.
тарантул .
Чистку нужно было начинать не тогда, когда у тарантула уже заканчивалась память, а заранее и с определенной периодичностью.
Для этой задачи в tarantool есть модуль, написанный на Lua, под названием истечение срока действия .
Попользовавшись этим модулем недолго, мы поняли, что он нам не подходит: из-за постоянной чистки больших объемов данных Lua зависал в GC. Поэтому мы задумались о разработке собственного модуля с ограниченным сроком действия, надеясь, что код, написанный на родном языке программирования, решит наши проблемы наилучшим образом.
Хорошим примером для нас стал модуль tarantool под названием кэширование памяти .
Используемый в нем подход основан на том, что в пространстве создается отдельное поле, которое указывает время жизни кортежа, другими словами, ttl. Модуль в фоновом режиме сканирует пространство, сравнивает TTL с текущим временем и решает, удалять кортеж или нет. Код модуля memcached прост и элегантен, но слишком универсален.
Во-первых, он не учитывает тип индекса, который сканируется и удаляется.
Во-вторых, на каждом проходе сканируются все кортежи, количество которых может быть довольно большим.
И если в модуле expirationd первая проблема была решена (индекс дерева был выделен в отдельный класс), то второй по-прежнему не удостоился никакого внимания.
Это и предопределило выбор в пользу написания собственного кода.
Описание
Документация по tarantool имеет очень хорошую руководство о том, как писать свои хранимые процедуры на C. Прежде всего предлагаю вам с ним ознакомиться, чтобы понять те вставки с командами и кодом, которые появятся ниже.Также стоит обратить внимание ссылка к объектам, которые доступны при написании собственного модуля, а именно коробка , волокно , индекс И спасибо .
Начнем издалека и посмотрим, как выглядит модуль с ограниченным сроком действия снаружи:
Для простоты запускаем tarantool в каталоге, где находится наша библиотека libcapped-expirationd.so. Из библиотеки экспортированы две функции: start и kill. Первый шаг — сделать эти функции доступными из Lua с помощью box.schema.func.create и box.schema.user.grant. Затем создайте пространство, кортежи которого будут содержать только три поля: первое — это уникальный идентификатор, второе — адрес электронной почты и третье — время жизни кортежа.fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) capped_connection = net_box:new(3300)
Мы строим индекс дерева поверх первого поля и называем его первичным.
Далее мы получаем объект подключения к нашей родной библиотеке.
После подготовительных работ запустите функцию запуска: capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Этот пример будет работать во время сканирования точно так же, как и модуль expirationd, написанный на Lua. Первым аргументом функции start является уникальное имя задачи.
Второй — идентификатор пространства.
Третий — уникальный индекс, по которому будут удаляться кортежи.
Четвертый — индекс, по которому будут проходиться кортежи.
Пятый — номер поля кортежа со временем жизни (нумерация начинается с 1, а не с 0!).
Шестой и седьмой — настройки сканирования.
1024 — максимальное количество кортежей, которые можно просмотреть за одну транзакцию.
3600 — время полного сканирования в секундах.
Обратите внимание, что в примере для сканирования и удаления используется один и тот же индекс.
Если это индекс дерева, то обход осуществляется от меньшего ключа к большему.
Если есть какой-то другой, например, хеш-индекс, то обход осуществляется, как правило, в случайном порядке.
Все пространственные кортежи сканируются за одно сканирование.
Вставим в пространство несколько кортежей со временем жизни 60 секунд: box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Проверим, что вставка прошла успешно: tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
.
Давайте повторим выбор через 60+ секунд (считая с начала вставки первого кортежа) и увидим, что модуль с ограниченным сроком действия уже обработан: tarantool> box.space.tester.index.primary:select()
---
- []
.
Остановим задачу: capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Давайте рассмотрим второй пример, где для сканирования используется отдельный индекс: fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Здесь все то же самое, что и в первом примере, за некоторыми исключениями.
Мы строим индекс дерева поверх третьего поля и называем его exp. Этот индекс не обязательно должен быть уникальным, в отличие от индекса, называемого первичным.
Обход будет осуществляться по индексу exp, а удаление — по первичному.
Мы помним, что раньше и то и другое делалось только с использованием первичного индекса.
После подготовительной работы запускаем функцию start с новыми аргументами: capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Давайте снова вставим в пространство несколько кортежей со временем жизни 60 секунд: box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Через 30 секунд по аналогии добавим еще несколько кортежей: box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Проверим, что вставка прошла успешно: tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
.
Давайте повторим выбор через 60+ секунд (считая с начала вставки первого кортежа) и увидим, что модуль с ограниченным сроком действия уже обработан: tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
.
В пространстве еще осталось несколько кортежей, которым осталось жить еще около 30 секунд. Причем сканирование останавливалось при переходе от кортежа с ID 2 и временем жизни 1576421257 к кортежу с ID 3 и временем жизни 1576421287. Кортежи со временем жизни 1576421287 и более не сканировались из-за упорядочивания ключи индекса опыта.
Это та экономия, которой мы хотели добиться в самом начале.
Остановим задачу: capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Выполнение
Лучший способ рассказать обо всех особенностях проекта — его первоисточник.код ! В рамках публикации мы остановимся только на самых важных моментах, а именно на алгоритмах обхода пространства.
Аргументы, которые мы передаем функции start, хранятся в структуре expirationd_task: struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Атрибут name — это имя задачи.
Атрибут space_id — это идентификатор пространства.
Атрибут rm_index_id — идентификатор уникального индекса, по которому будут удаляться кортежи.
Атрибут it_index_id — это идентификатор индекса, по которому будут проходиться кортежи.
Атрибут it_index_type — это тип индекса, по которому будут проходиться кортежи.
Атрибут filed_no — это номер поля кортежа со временем жизни.
Атрибут scan_size — это максимальное количество кортежей, сканируемых за одну транзакцию.
Атрибут scan_time — это полное время сканирования в секундах.
Мы не будем рассматривать разбор аргументов.
Это кропотливая, но простая работа, в которой вам поможет библиотека.
Трудности могут возникнуть только с индексами, которые передаются из Lua как сложная структура данных с типом mp_map, а не с использованием простых типов mp_bool, mp_double, mp_int, mp_uint и mp_array. Но нет необходимости анализировать весь индекс.
Вам просто нужно проверить его уникальность, вычислить тип и извлечь идентификатор.
Перечислим прототипы всех функций, которые используются для парсинга: bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Теперь перейдем к самому главному — логике обхода пробела и удаления кортежей.
Каждый блок кортежей размером не более scan_size сканируется и модифицируется в рамках одной транзакции.
В случае успеха эта транзакция фиксируется; в случае возникновения ошибки происходит откат. Последний аргумент функции expirationd_iterate — это указатель на итератор, с которого начинается или продолжается сканирование.
Этот итератор увеличивается внутренне до тех пор, пока не произойдет ошибка, не закончится место или не будет возможности заранее остановить процесс.
Функция expirationd_expired проверяет время жизни кортежа, expirationd_delete удаляет кортеж, expirationd_breakable проверяет, нужно ли нам двигаться дальше.
Код функции Expirationd_iterate: static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Код функции expirationd_expired: static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Код функции Expirationd_delete: static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Код функции expiration_breakable: static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Приложение
Вы можете просмотреть исходный код на здесь ! Теги: #программирование #открытый исходный код #Хранение данных #tarantool #C++ #Lua #capi #capped #non-lua #expirationd-
Кнопка, Которая Вам Нравится В 10 Раз Больше
19 Oct, 24 -
Банальности О Тесте Ab
19 Oct, 24 -
Ибп Для Медицинских Учреждений
19 Oct, 24 -
Капитал 2.0, Или То, Чего Не Заметил Маркс.
19 Oct, 24 -
Убийственные Процессы
19 Oct, 24