Я думаю, что практически в каждом реальном проекте используется та или иная форма реализации очереди поставщик-потребитель( очередь производитель-потребитель ).
Идея проблемы довольно проста.
Приложению необходимо отделить производство некоторых данных от их обработки.
Возьмем, к примеру, пул потоков CLR: мы добавляем элемент для обработки, вызывая ThreadPool.QueueUserWorkItem , а пул потоков сам определяет, какое количество рабочих потоков наиболее оптимально, и вызывает методы для обработки элементов с необходимой степенью параллелизма.
Но использование стандартного пула потоков не всегда возможно и/или целесообразно.
Хотя вы можете указать минимальное и максимальное количество потоков, эта конфигурация является глобальной и будет влиять на все приложение, а не на отдельные его части.
Есть много других способов решить проблему потребителей-поставщиков.
Это может быть прямое решение, в котором логика приложения смешана с аспектами многопоточности, организации очередей и синхронизации.
Это может быть обертка БлокированиеСбор с ручным контролем количества рабочих потоков или задач.
Или это может быть решение, основанное на полностью готовом решении, таком как Блок действий из TPL DataFlow. Сегодня мы рассмотрим внутреннюю структуру класса.
Блок действий , мы обсудим проектные решения, которые были приняты его авторами, и выясним, зачем нам все это знать, чтобы обойти некоторые проблемы при его использовании.
Готовый? Ну тогда поехали! В моем текущем проекте есть ряд случаев, когда нам нужно решить проблему поставщик-потребитель.
Один из них выглядит так: у нас есть собственный парсер и интерпретатор языка, очень похожего на TypeScript. Не вдаваясь в подробности, можно сказать, что нам необходимо разобрать набор файлов и получить так называемое «транзитивное замыкание» всех зависимостей.
После чего их нужно преобразовать в исполняемое представление и выполнить.
Логика разбора выглядит примерно так:
- Разбор файла.
- Анализируем его содержимое и ищем зависимости (путем анализа всех «import*from», «require» и подобных конструкций).
- Вычисляем зависимости (т.е.
находим набор файлов, который необходим текущему файлу для нормальной работы).
- Добавляем полученные файлы зависимостей в список для парсинга.
Вот как будет выглядеть слегка упрощенная реализация на основе TPL Dataflow и класса Блок действий :
Давайте посмотрим, что здесь происходит. Для простоты вся основная логика заключена в методе Основной .private static Task<ParsedFile> ParseFileAsync(string path) { Console.WriteLine($"Parsing '{path}'.
{{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(10); return Task.FromResult( new ParsedFile() { FileName = path, Dependencies = GetFileDependencies(path), }); } static void Main(string[] args) { long numberOfProcessedFiles = 0; ActionBlock<string> actionBlock = null; Func<string, Task> processFile = async path => { Interlocked.Increment(ref numberOfProcessedFiles); ParsedFile parsedFile = await ParseFileAsync(path); foreach (var dependency in parsedFile.Dependencies) { Console.WriteLine($"Sending '{dependency}' to the queue. {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); await actionBlock.SendAsync(dependency); } if (actionBlock.InputCount == 0) { // This is a marker that this is a last file and there // is nothing to process actionBlock.Complete(); } }; actionBlock = new ActionBlock<string>(processFile); actionBlock.SendAsync("FooBar.ts").
GetAwaiter().
GetResult(); Console.WriteLine("Waiting for an action block to finish."); actionBlock.Completion.GetAwaiter().
GetResult(); Console.WriteLine($"Done. Processed {numberOfProcessedFiles}"); Console.ReadLine(); }
Переменная числообработанных файлов используется для проверки правильности логики и содержит общее количество обработанных файлов.
Основная работа выполняется в делегате файл процесса который затем передается конструктору Блок действий .
Этот делегат играет роль «потребителя» и «производителя»: он принимает путь к файлу через аргумент. путь , анализирует файл, находит его зависимости и отправляет новые файлы в очередь, вызывая метод actionBlock.SendAsync .
Затем проверяется количество элементов в очереди обработки, и если новых элементов нет, то вся операция завершается вызовом действиеБлок.
Завершить() (*).
Тогда метод Основной создает экземпляр ДействиеБлок , начинает обработку первого файла и ждет окончания всего процесса.
Метод ParseFileAsync эмулирует процесс анализа файла и вычисляет зависимости, используя следующую примитивную логику: файл «foo.ts» зависит от «fo.ts», который зависит от «f.ts».
Те.
каждый файл зависит от файла с более коротким именем.
Это нереалистичная логика, но она позволяет показать основную идею вычисления транзитивного закрытия файлов.
Сорт Блок действий управляет параллелизмом за вас.
Однако нужно учитывать, что по умолчанию «степень параллелизма» равна 1 и чтобы это изменить, нужно передать экземпляр класса Варианты исполненияDataflowBlockOptions в конструкторе ДействиеБлок .
Если имущество MaxDegreeOfParallelism будет больше 1, то Блок действий будет вызывать делегат обратного вызова из разных потоков (фактически из разных задач) для параллельной обработки элементов очереди.
Post против SendAsync: что и когда использовать
Любой, кто хоть раз пытался самостоятельно решить задачу поставщик-потребитель, сталкивался с проблемой: что делать, когда поток входных данных превышает возможности обработки потребителей? Как ограничить поток входных данных? Просто сохранить все входные элементы в памяти? Выбросить исключение? Вернуться ЛОЖЬ в методе добавления элемента? Использовать кольцевой буфер и удалять старые элементы? Или заблокировать выполнение этого метода, пока не останется место в очереди? Для решения этой проблемы авторы Блок действий Мы решили использовать следующий общепринятый подход:- Клиент может установить размер очереди при создании объекта Блок действий .
- Если очередь заполнена, то метод Почта возвращает ЛОЖЬ и метод расширения ОтправитьАсинк возвращает задачу, которая завершится, когда в очереди появится свободное место.
Это означает, что если новые элементы добавляются быстрее, чем они обрабатываются, приложение рано или поздно выйдет из строя.
Исключение OutOfMemoryException .
Но давайте попробуем исправить эту ситуацию.
И давайте зададим очереди очень маленький размер, например, 1 элемент. actionBlock = new ActionBlock<string>(processFile,
new ExecutionDataflowBlockOptions() {BoundedCapacity = 1});
Теперь, если мы запустим этот код, мы получим.
тупик!
Дедлок
Давайте подумаем о проблеме клиент-поставщик с точки зрения дизайна.Мы пишем собственную очередь, которая принимает метод обратного вызова для обработки элементов.
Нам нужно решить, должен ли он поддерживать ограничение мощности или нет. Если нам нужна «ограниченная» очередь, то мы, вероятно, получим дизайн, очень похожий на дизайн классов.
Блок действий : Мы добавим синхронный метод для добавления элементов, которые будут возвращать ЛОЖЬ , если очередь заполнена, и асинхронный метод, возвращающий задачу.
В случае полной очереди клиент нашего класса будет иметь возможность решать, что ему делать: самому обрабатывать «переполнение», вызывая синхронный вариант добавления элементов, или ждать освобождения места в очереди, используя асинхронный вариант. .
Затем вам нужно будет решить, когда вызывать метод обратного вызова.
В итоге можно придумать следующую логику: если очередь не пуста, то берётся первый элемент, вызывается метод обратного вызова, обработка завершается, после чего элемент удаляется из очереди.
(Фактическая реализация будет значительно сложнее, чем кажется, просто потому, что она должна учитывать всевозможные расы).
Очередь может принять решение удалить элемент перед вызовом метода обратного вызова, но, как мы вскоре увидим, это не повлияет на возможность возникновения взаимоблокировки.
Мы придумали простой и элегантный дизайн, но он легко может привести к проблемам.
Предположим, что очередь заполнена и сейчас вызывается обратный вызов для обработки одного из элементов.
Но что, если вместо того, чтобы быстро «вернуть» очередь управления, обработчик попытается добавить еще один элемент, вызвав Ждите ОтправитьАсинк :
Очередь заполнена и не может принимать новые элементы, поскольку метод обратного вызова еще не завершился.
Но этот метод также зависает в ожидании завершения.
Ждите ОтправитьАсинк и не может двигаться дальше, пока в очереди не останется место.
Классический тупик! Хорошо, мы зашли в тупик, потому что Блок действий удаляет элемент из очереди *после* завершения метода обратного вызова.
Но давайте рассмотрим альтернативный сценарий: что произойдет, если Блок действий удалит элемент *до* вызова метода обратного вызова? На самом деле ничего не изменится.
Тупик по-прежнему возможен.
Представим, что размер очереди равен единице, а степень параллелизма равна двум.
- Поток T1 добавляет элемент в очередь.
Блок действий берет элемент из очереди (уменьшая количество элементов в очереди до 0) и вызывает метод обратного вызова.
- Поток T2 добавляет элемент в очередь.
Блок действий берет элемент из очереди (уменьшая количество элементов в очереди до 0) и вызывает метод обратного вызова.
- Поток T1 добавляет элемент в очередь.
ActionBlock не может вызвать обработчик для нового элемента, поскольку уровень параллелизма равен 2, а у нас уже есть два обработчика.
Очередь заполнена.
- Во время обработки первый обработчик пытается добавить новый элемент в очередь, но застревает на вызове ожидайте SendAsync потому что очередь заполнена.
- Во время обработки второй обработчик пытается добавить новый элемент в очередь, но застревает на вызове ожидайте SendAsync потому что очередь заполнена.
Получается, что удаление элемента из очереди перед обработкой не поможет. Более того, это только усугубит проблему, так как вероятность тупика будет значительно снижена (при степени параллелизма, равной N, всем N методам обратного вызова необходимо попытаться добавить новые элементы в очередь одновременно).
Другой недостаток менее очевиден.
Блок действий все еще не универсальное решение.
Этот класс реализует интерфейс ITargetSource и может использоваться для обработки элементов в сложных сценариях потока данных.
Например, у нас может быть БуферБлок с несколькими «целевыми» блоками для параллельной обработки элементов.
В текущей реализации балансировка обработчиков реализована тривиально.
Как только получатель (в нашем случае Блок действий ) заполнен, он перестает принимать на вход новые элементы.
И это дает возможность другим блокам в цепочке обрабатывать элемент вместо него.
Если элемент удаляется только после того, как он был обработан, ActionBlock станет более жадным и будет принимать больше элементов, чем он может обработать в данный момент. В этом случае размер (ограниченная емкость) каждого блока будет равен «BoundedCapacy» + «MaxDegreeOfParallelism».
Как решить проблему тупика?
Боюсь, что нет. Если вам одновременно нужно ограничить количество элементов в очереди и метод обратного вызова может добавлять новые элементы, то из Блок действий придется отказаться.
Альтернативой может быть решение, основанное на БлокированиеСбор и «ручное» управление количеством рабочих потоков, например, с помощью пула задач или Parallel.Invoke.
Степень параллелизма
В отличие от примитивов из TPL, все блоки из TPL Dataflow по умолчанию являются однопоточными.Те.
Блок действий , ТрансформаторБлок и другие, вызывайте метод обратного вызова по одному.
Авторы TPL Dataflow считали, что простота важнее возможного прироста производительности.
Думать о графах потоков данных в целом довольно сложно, а параллельная обработка данных всеми блоками еще больше усложнит этот процесс.
Чтобы изменить степень параллелизма, блок нужно передать Варианты исполненияDataflowBlockOptions и установите свойство MaxDegreeOfParallelism значение больше 1. Кстати, если задать этому свойству значение -1, то все входящие элементы будут обрабатываться новой задачей и параллелизм будет ограничиваться только возможностями используемого планировщика задач (объект Диспетчер задач ), который также может передаваться через Варианты исполненияDataflowBlockOptions .
Заключение
Разработка компонентов, которые просты в использовании, является сложной задачей.Разработка простых в использовании компонентов, решающих проблемы параллелизма, вдвойне сложна.
Чтобы правильно использовать подобные компоненты, вам необходимо знать, как они реализованы и какие ограничения имели в виду их разработчики.
Сорт Блок действий Это отличная вещь, которая значительно упрощает реализацию шаблона поставщик-потребитель.
Но даже в этом случае вам следует знать о некоторых аспектах TPL Dataflow, таких как степень параллелизма и поведение блоков в случае переполнения.
— (*) Этот пример не является потокобезопасным, и в полной реализации не следует использовать actionBlock.InputCount .
Ты видишь проблему? (**) Метод Почта возвращает ЛОЖЬ в одном из двух случаев: очередь заполнена или уже завершена (метод называется Полный ).
Этот аспект может затруднить использование этого метода, поскольку невозможно различить эти два случая.
Метод ОтправитьАсинк , с другой стороны, ведет себя немного иначе: метод возвращает объект Задача , который будет находиться в незавершенном состоянии, пока очередь заполнена, а если очередь уже заполнена и не способна принять новые элементы, то задача.
Результат будет равен ЛОЖЬ .
Теги: #Поток данных TPL #Параллелизм #.
NET #Параллельное программирование
-
Сеансы – Всегда Ли Они Необходимы?
19 Oct, 24 -
Я Хочу. У Меня Есть. Я Люблю
19 Oct, 24 -
Миллион Смартфонов За Шесть Месяцев
19 Oct, 24