Инструменты Разработчика Node.js. Очереди Заданий

При реализации поддержки веб-приложений и мобильных приложений, даже самых простых, уже стало обычным использовать такие инструменты, как: базы данных, почтовый (smtp) сервер, redis-сервер.

Спектр используемых инструментов постоянно расширяется.

Например очереди сообщений, судя по количеству установок пакета amqplib (650 тыс.

установок в неделю), используется наравне с реляционными базами данных (пакет mysql 460 тыс.

установок в неделю и pg 800 тыс.

установок в неделю).

Сегодня я хочу поговорить об очередях заданий, которые в настоящее время используются гораздо реже, хотя необходимость в них возникает практически во всех реальных проектах.

Итак, очереди заданий позволяют асинхронно выполнить какую-то задачу, по сути, выполнить функцию с заданными входными параметрами и в заданное время.

В зависимости от параметров задача может выполняться:

  • сразу после добавления заданий в очередь;
  • один раз в определенное время;
  • неоднократно по графику.

Очереди заданий позволяют передавать параметры выполняемому заданию, отслеживать и повторно выполнять задания, которые завершились сбоем, а также устанавливать ограничение на количество одновременно выполняемых заданий.

Подавляющее большинство приложений Node.js включают разработку REST-API для веб- и мобильных приложений.

Сокращение времени выполнения REST-API важно для комфортной работы пользователя с приложением.

В то же время вызов REST API может инициировать длительные и/или ресурсоемкие операции.

Например, после совершения покупки вам необходимо отправить пользователю push-сообщение в мобильное приложение или отправить запрос на совершение покупки в REST-API CRM. Эти запросы могут выполняться асинхронно.

Как это сделать правильно, если у вас нет инструмента для работы с очередями заданий? Например, вы можете отправить сообщение в очередь сообщений, запустить воркер, который будет читать эти сообщения и выполнять необходимую работу на основе этих сообщений.

Фактически, это примерно то, что делают очереди заданий.

Однако если присмотреться, очереди заданий имеют несколько фундаментальных отличий от очередей сообщений.

Во-первых, сообщения помещаются в очередь сообщений (статическую), а очереди заданий предполагают выполнение некоторой работы (вызов функции).

Во-вторых, очередь заданий подразумевает наличие какого-то процессора (воркера), который будет выполнять данную работу.

Для этого требуется дополнительный функционал.

Количество рабочих процессоров должно прозрачно масштабироваться в случае увеличения нагрузки.

С другой стороны, необходимо ограничить количество одновременно выполняемых заданий на одном рабочем процессоре, чтобы сгладить пиковые нагрузки и предотвратить отказы в обслуживании.

Это показывает, что существует потребность в инструменте, который мог бы запускать асинхронные задания, настраивая различные параметры так же легко, как мы делаем запрос через REST-API (или, что еще лучше, даже проще).

Используя очереди сообщений, относительно легко реализовать очередь заданий, которые выполняются сразу после помещения задания в очередь.

Но зачастую вам необходимо выполнить задачу один раз в установленное время или по расписанию.

Для этих задач широко используется ряд пакетов, реализующих логику cron в Linux. Чтобы не быть голословным, скажу, что пакет node-cron имеет 480 тысяч установок в неделю, node-schedule — 170 тысяч установок в неделю.

Использовать node-cron, конечно, удобнее аскетического setInterval(), но лично я столкнулся с рядом проблем при его использовании.

Если выразить это как общий недостаток, то это отсутствие контроля над количеством одновременно выполняемых заданий (это стимулирует пиковые нагрузки: увеличение нагрузки замедляет работу заданий, замедление работы заданий увеличивает количество одновременно выполняемых заданий).

а это, в свою очередь, еще больше нагружает систему), невозможность запуска node для повышения производительности -cron на нескольких ядрах (в этом случае все задания выполняются независимо на каждом ядре) и отсутствие средств для отслеживания и перезапуска неудачных заданий.

Я надеюсь, что я показал, что такой инструмент, как очередь заданий, необходим наравне с такими инструментами, как базы данных.

И такие средства появились, хотя они пока не получили широкого применения.

Перечислю самые популярные из них:

Имя пакета Количество установок в неделю Количество лайков
сигнал 29190 8753
пчелиная очередь 29022 1431
повестка дня 25459 5488
бык 56232 5909
*) По состоянию на 25 сентября 2021 года после возобновления поддержки проекта.

.

Сегодня я рассмотрю использование пакета Bull, с которым работаю сам.

Почему я выбрал именно этот пакет (хотя я не навязываю свой выбор другим).

В то время, когда я начал искать удобную реализацию очереди сообщений, проект bee-queue уже был остановлен.

Реализация kue, согласно бенчмаркам, приведенным в репозитории bee-queue, сильно отставала от других реализаций и к тому же не содержала инструментов для запуска периодически выполняемых задач.

Проект повестки дня реализует очереди, которые хранятся в базе данных mongodb. Это большой плюс для некоторых случаев, если вам нужна сверхнадежность при постановке заданий в очередь.

Однако это не единственный решающий фактор.

Естественно, я тестировал все версии библиотек до предела, генерируя большое количество задач в очереди, и так и не смог заставить повестку работать плавно.

При превышении определенного количества задач повестка останавливалась и перестали назначать задачи в работу.

Поэтому я остановился на Bull, который реализует удобный API, с достаточной скоростью и масштабируемостью, так как пакет Bull использует в качестве резервного сервера Redis. Вы также можете использовать кластер серверов Redis. При создании очереди очень важно подобрать оптимальные параметры очереди заданий.

Параметров много, и значение некоторых из них до меня дошло не сразу.

После многочисленных экспериментов я остановился на следующих параметрах:

  
  
  
  
  
  
   

const Bull = require('bull'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const defaultJobOptions = { removeOnComplete: true, removeOnFail: false, }; const limiter = { max: 10000, duration: 1000, bounceBack: false, }; const settings = { lockDuration: 600000, // Key expiration time for job locks. stalledInterval: 5000, // How often check for stalled jobs (use 0 for never checking).

maxStalledCount: 2, // Max amount of times a stalled job will be re-processed. guardInterval: 5000, // Poll interval for delayed jobs and added jobs. retryProcessDelay: 30000, // delay before processing next job in case of internal error. drainDelay: 5, // A timeout for when the queue is in drained state (empty waiting for jobs).

}; const bull = new Bull('my_queue', { redis, defaultJobOptions, settings, limiter }); module.exports = { bull };

В тривиальных случаях нет необходимости создавать много очередей, так как в каждой очереди можно задать имена для разных заданий, и с каждым именем связать разного процессора-работника:

const { bull } = require('.

/bull'); bull.process('push:news', 1, `${__dirname}/push-news.js`); bull.process('push:status', 2, `${__dirname}/push-status.js`); .

bull.process('some:job', function(.

args) { .

});

Я использую ту возможность, которая идет с Bull «из коробки» — для распараллеливания рабочих процессоров на нескольких ядрах.

Для этого вторым параметром указывается количество ядер, на которых будет запускаться рабочий процессор, а третьим параметром указывается имя файла, определяющего функцию обработки задания.

Если такая функция не нужна, вы можете просто передать функцию обратного вызова в качестве второго параметра.

Задача помещается в очередь вызовом метода add(), которому в качестве параметров передаются имя очереди и объект, которые в дальнейшем будут переданы обработчику задачи.

Например, в хуке ORM после создания поста с новой новостью я могу асинхронно отправить push-сообщение всем клиентам:

afterCreate(instance) { bull.add('push:news', _.pick(instance, 'id', 'title', 'message'), options); }

Обработчик событий принимает в качестве параметров объект задачи с параметрами, передаваемыми в метод add() и функцию Done(), которую необходимо вызвать для подтверждения завершения задачи или для сообщения о том, что задача завершилась с ошибкой:

const { firebase: { admin } } = require('.

/firebase'); const { makePayload } = require('.

/makePayload'); module.exports = (job, done) => { const { id, title, message } = job.data; const data = { id: String(id), type: 'news', }; const payloadRu = makePayload(title.ru, message.ru, data); const payloadEn = makePayload(title.en, message.en, data); return Promise.all([ admin.messaging().

send({ .

payloadRu, condition: "'news' in topics && 'ru' in topics" }), admin.messaging().

send({ .

payloadEn, condition: "'news' in topics && 'en' in topics" }), ]) .

then(response => done(null, response)) .

catch(done); };

Вы можете использовать инструмент Arena-Bull для просмотра статуса очереди заданий:

const Arena = require('bull-arena'); const redis = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, connectTimeout: 180000 }; const arena = Arena({ queues: [ { name: 'my_gueue', hostId: 'My Queue', redis, }, ], }, { basePath: '/', disableListen: true, }); module.exports = { arena };

И напоследок небольшой лайфхак.

Как я уже говорил, Bull использует сервер Redis в качестве резервной копии.

Вероятность того, что при перезапуске сервера redis задания исчезнут, очень мала.

Но зная тот факт, что системные администраторы иногда могут просто «очистить редисковый кеш», удалив при этом все задания в частности, меня беспокоили в первую очередь периодически запускаемые задания, которые в этом случае останавливались навсегда.

В связи с этим я нашел способ возобновить такие периодические задачи:

const cron = '*/10 * * * * *'; const { bull } = require('.

/app/services/bull'); bull.getRepeatableJobs() .

then(jobs => Promise.all(_.map(jobs, (job) => { const [name, cron] = job.key.split(/:{2,}/); return bull.removeRepeatable(name, { cron }); }))) .

then(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } })); setInterval(() => bull.add('check:status', {}, { priority: 1, repeat: { cron } }), 60000);

То есть задача сначала удаляется из очереди, а потом ставится снова, и все это (увы) методом setInterval().

Собственно, без такого лайфхака я бы, возможно, и не решился бы использовать периодические задачи на быке.

УПД1. Проект пчелиной очереди снова поддерживается.

УПД2. бык не имеет способности, которая есть у коа и пчелы-очереди, — прослушивать события конкретной работы.

Тогда это очень нужно для такого случая:

const job = queue.job({.

}); job.on('success', function(result) { res.status(200).

send(result); })

УПД3. куэ проект на данный момент не поддерживается [email protected] 3 июля 2019 г.

Теги: #Разработка мобильных приложений #Разработка веб-сайтов #JavaScript #node.js #Bull #очередь заданий

Вместе с данным постом часто просматривают: