Введение Обычно при оптимизации программы для многоядерных компьютеров первым делом необходимо определить, можно ли разделить алгоритм на части, которые выполняются параллельно.
Если вам нужно параллельно обрабатывать отдельные элементы из большого набора данных для решения проблемы, новые возможности параллелизма в .
NET Framework 4 являются основными кандидатами: Параллельный.
ForEach и параллельный LINQ ( ПЛИНК )
Параллельный.
ForEach Класс Parallel содержит метод Для каждого , который представляет собой многопоточную версию обычного цикла foreach в C#.
Как и обычный foreach, Parallel.ForEach перебирает перечислимые данные, но использует несколько потоков.
Одна из наиболее часто используемых перегрузок Параллельный.
ForEach следующее:
Ienumerable указывает последовательность для итерации, а тело действия указывает делегата, который будет вызываться для каждого элемента.public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource> body)
Полный список перегрузок Parallel.ForEach можно найти Здесь .
ПЛИНК
Относится к Parallel.ForEach ПЛИНК — это модель программирования для параллельных операций с данными.Пользователь определяет операцию из стандартного набора операторов, включая проекции, фильтры, агрегации и т. д. Аналогично Parallel.ForEach ПЛИНК достигает параллелизма за счет разбиения входной последовательности на части и обработки элементов в разных потоках.
В статье освещаются различия между этими двумя подходами к параллелизму.
Обсуждаются случаи использования, в которых лучше использовать Parallel.ForEach вместо PLINQ и наоборот.
Выполнение самостоятельных операций
Если вам необходимо выполнить длительные вычисления над элементами последовательности и полученные результаты независимы, то предпочтительнее использовать Parallel.ForEach. PLinq, в свою очередь, окажется слишком тяжеловесным для подобных операций.Кроме того, для Параллельный.
ForEach указано максимальное количество потоков, то есть если Пул потоков мало ресурсов и доступно меньше потоков, чем указано в ParallelOptions.MaxDegreeOfParallelism , будет использоваться оптимальное количество потоков, которое можно увеличивать по мере выполнения.
Для ПЛИНК количество исполняемых потоков строго задано.
Параллельные операции, сохраняющие порядок данных
PLINQ для поддержания порядка
Если ваши преобразования требуют сохранения порядка входных данных, вы, вероятно, обнаружите, что их проще использовать.ПЛИНК , как Параллельный.
ForEach .
Например, если мы хотим преобразовать цветные видеокадры RGB в черно-белые, порядок выходных кадров, естественно, должен быть сохранен.
В этом случае лучше использовать ПЛИНК и функция Как заказано() , который в недрах PLINQ разбивает входную последовательность, выполняет преобразования, а затем упорядочивает результат в правильном порядке.
public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
var ProcessedMovie =
Movie
.
AsParallel() .
AsOrdered() .
Select(frame => ConvertToGrayscale(frame));
foreach (var grayscaleFrame in ProcessedMovie)
{
// Movie frames will be evaluated lazily
}
}
Почему бы здесь не использовать Parallel.ForEach?
За исключением тривиальных случаев, реализация параллельных операций над последовательными данными с использованием Параллельный.
ForEach требует значительного количества кода.
В нашем случае мы можем использовать перегрузку функций.
Для каждого чтобы повторить эффект оператора AsOrdered():
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState,Int64>body)
В перегруженной версии Для каждого параметр индекса текущего элемента был добавлен к делегату действия с данными.
Теперь мы можем записать результат в выходную коллекцию по тому же индексу, параллельно выполнять дорогостоящие вычисления и в итоге получить выходную последовательность в правильном порядке.
Следующий пример иллюстрирует один из способов поддержания порядка с помощью Параллельный.
ForEach :
public static double [] PairwiseMultiply( double[] v1, double[] v2)
{
var length = Math.Min(v1.Length, v2.Lenth);
double[] result = new double[length];
Parallel.ForEach(v1,
(element, loopstate, elementIndex) =>
result[elementIndex] = element * v2[elementIndex]);
return result;
}
Однако быстро обнаруживаются недостатки такого подхода.
Если входная последовательность представляет собой IEnumerable, а не массив, существует 4 способа реализации сохранения порядка:
- Первый вариант — вызвать IEnumerable.Count(), стоимость которого составит O(n).
Если количество элементов известно, вы можете создать выходной массив для хранения результатов по заданному индексу.
- Второй вариант — материализовать коллекцию (превратив ее, например, в массив).
Если данных много, то этот метод не очень подходит.
- Третий вариант — тщательно продумать свою коллекцию выходных данных.
Выходная коллекция может представлять собой хеш, тогда объем памяти, необходимый для хранения выходных значений, будет как минимум в 2 раза превышать входную память во избежание коллизий хеширования; если данных много, то структура данных для хэша будет непомерно большой, а также вы можете столкнуться с падением производительности из-за ложного разделения и сборщика мусора.
- Последний вариант — сохранить результаты с их исходными индексами, а затем применить собственный алгоритм сортировки к выходной коллекции.
Инфраструктура PLINQ позволяет оператору Как заказано() обрабатывать потоковые данные, другими словами, PLINQ поддерживает ленивую материализацию.
В PLINQ материализация всей последовательности — худшее решение.
Вы можете легко избежать вышеуказанных проблем и выполнять параллельные операции с данными, просто используя оператор Как заказано() .
Параллельная обработка потоковых данных
Использование PLINQ для потоковой обработки
PLINQ предлагает возможность обрабатывать запрос как запрос к потоку.Эта возможность чрезвычайно ценна по следующим причинам:
- 1. Результаты не материализуются в массиве, поэтому нет избыточности в хранении данных в памяти.
- 2. Перечислять результаты можно в одном потоке вычислений по мере поступления новых данных.
В ПЛИНК код будет выглядеть примерно так:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
var StockRiskPortfolio =
Stocks
.
AsParallel() .
AsOrdered() .
Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)}) .
Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));
foreach (var stockRisk in StockRiskPortfolio)
{
SomeStockComputation(stockRisk.Risk);
// StockRiskPortfolio will be a stream of results
}
}
В этом примере элементы разделены на части ( перегородки ), обрабатываются несколькими потоками, а затем переупорядочиваются; Важно понимать, что эти шаги выполняются параллельно, поскольку результаты фильтрации появляются в однопоточном потребителе в цикле.
для каждого может производить расчеты.
PLINQ оптимизирован по производительности, а не по задержке, и использует внутренние буферы; Может случиться так, что хотя частичный результат уже получен, он останется в выходном буфере до тех пор, пока выходной буфер полностью не насытится и не позволит осуществлять дальнейшую обработку.
Ситуацию можно исправить с помощью метода расширения PLINQ. WithMergeOptions , который позволяет вам установить буферизацию вывода.
Метод WithMergeOptions принимает перечисление в качестве параметра Параметры параллельного слияния вы можете указать, как запрос выдает окончательный результат, который будет использоваться одним потоком.
Доступны следующие варианты:
- ParallelMergeOptions.NotBuffered - указывает, что каждый обработанный элемент возвращается из каждого потока сразу после его обработки.
- ParallelMergeOptions.AutoBuffered — указывает, что элементы собираются в буфере, буфер периодически возвращается в поток-потребитель
- ParallelMergeOptions.FullyBuffered — указывает на то, что выходная последовательность полностью буферизована, это позволяет получить результаты быстрее, чем при использовании других вариантов, но тогда потребительскому потоку придется долго ждать, чтобы получить первый элемент для обработки.
Почему не Parallel.ForEach?
Оставим в стороне недостатки Parallel.ForEach по сохранению порядка последовательности.
Для вычислений вне порядка в потоке с использованием Parallel.ForEach код будет выглядеть следующим образом:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
Parallel.ForEach(Stocks,
stock => {
var risk = ComputeRisk(stock);
if(ExpensiveRiskAnalysis(risk)
{
// stream processing
lock(myLock) { SomeStockComputation(risk) };
// store results
}
}
Этот код почти идентичен примеру PLINQ, за исключением явной блокировки и менее элегантного кода.
Обратите внимание, что в этой ситуации Parallel.ForeEach предполагает сохранение результатов в потокобезопасном виде, тогда как PLINQ делает это за вас.
Для сохранения результатов у нас есть 3 способа: первый — хранить значения в потокобезопасной коллекции и требовать блокировки при каждой записи.
Второй — сохранить в потокобезопасную коллекцию; к счастью, .
NET Framework 4 предоставляет набор таких коллекций в пространстве имен System.Collections.Concurrent и вам не придется реализовывать это самостоятельно.
Третий способ — использовать Parallel.ForEach с локальный поток хранилище, о котором речь пойдет ниже.
Каждый из этих методов требует явного управления сторонними эффектами записи в коллекцию, тогда как PLINQ позволяет нам абстрагировать эти операции.
Операции над двумя коллекциями
Применение ПЛИНК для операций над двумя коллекциями
PLINQ-оператор Почтовый индекс особым образом выполняет параллельные вычисления над двумя разными коллекциями.Поскольку его можно комбинировать с другими запросами, вы можете параллельно выполнять сложные операции с каждой коллекцией перед объединением двух коллекций.
Например:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
return
a
.
AsParallel() .
AsOrdered() .
Select(element => ExpensiveComputation(element)) .
Zip( b .
AsParallel() .
AsOrdered() .
Select(element => DifferentExpensiveComputation(element)),
(a_element, b_element) => Combine(a_element,b_element));
}
В приведенном выше примере показано, как каждый источник данных обрабатывается параллельно различными операциями, затем результаты из обоих источников объединяются с помощью оператора Zip.
Почему не Parallel.ForEach?
Аналогичную операцию можно выполнить с перегрузкой Parallel.ForEach с использованием индексов, например:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
var numElements = Math.Min(a.Count(), b.Count());
var result = new T[numElements];
Parallel.ForEach(a,
(element, loopstate, index) =>
{
var a_element = ExpensiveComputation(element);
var b_element = DifferentExpensiveComputation(b.ElementAt(index));
result[index] = Combine(a_element, b_element);
});
return result;
}
Однако существуют потенциальные подводные камни и недостатки, описанные в использовании Parallel.ForEach при сохранении порядка данных.
Один из недостатков предполагает сканирование всей коллекции до конца и явное управление индексами.
Состояние локального потока ( Локальное состояние потока )
Применение Параллельный.
ForEach для доступа к локальному состоянию потока Хотя PLINQ предоставляет более компактные средства выполнения параллельных операций с данными, некоторые сценарии обработки лучше подходят для использования.
Параллельный.
ForEach , например операции, поддерживающие локальное состояние потока.
Сигнатура соответствующего метода Параллельный.
ForEach выглядит так:
public static ParallelLoopResult ForEach<TSource,TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
Обратите внимание, что существует перегрузка операторов.
Совокупный , который обеспечивает доступ к локальному состоянию потока и может использоваться, если шаблон обработки данных можно описать как уменьшение размерности.
Следующий пример иллюстрирует, как исключить непростые числа из последовательности:
public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
var results = new List<R>();
using (SemaphoreSlim sem = new SemaphoreSlim(1))
{
Parallel.ForEach(source, () => new List<R>(),
(element, loopstate, localStorage) =>
{
bool filter = filterFunction(element);
if (filter)
localStorage.Add(element);
return localStorage;
},
(finalStorage) =>
{
lock(myLock)
{
results.AddRange(finalStorage)
};
});
}
return results;
}
Подобную функциональность можно было бы гораздо проще реализовать с помощью PLINQ, цель примера — показать, что использование Параллельный.
ForEach и состояние локального потока могут значительно снизить затраты на синхронизацию.
Однако в других сценариях состояния локальных потоков становятся абсолютно необходимыми, следующий пример демонстрирует такой сценарий.
Представьте, что вы, как блестящий ученый-компьютерщик и математик, разработали статистическую модель анализа риска ценных бумаг; Эта модель, по вашему мнению, разобьет вдребезги все остальные модели риска.
Чтобы доказать это, вам нужны данные с информационных сайтов фондового рынка.
Но последовательность загрузки данных будет очень длинной и является узким местом для восьмиядерного компьютера.
Хотя использование Параллельный.
ForEach это простой способ параллельной загрузки данных с помощью Веб-клиент , каждый поток будет блокироваться при каждой загрузке, что можно улучшить за счет использования асинхронного ввода-вывода; доступна дополнительная информация Здесь .
Из соображений производительности вы решили использовать Параллельный.
ForEach для перебора коллекции URL-адресов и параллельной загрузки данных.
Код выглядит примерно так:
public static void UnsafeDownloadUrls ()
{
WebClient webclient = new WebClient();
Parallel.ForEach(urls,
(url,loopstate,index) =>
{
webclient.DownloadFile(url, filenames[index] + ".
dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
});
}
Удивительно, но во время выполнения мы получаем исключение: «System.NotSupportedException -> WebClient не поддерживает одновременные операции ввода-вывода».
Понимая, что несколько потоков не могут получить доступ к одному Веб-клиент в то же время вы решаете создать Веб-клиент за каждую загрузку.
public static void BAD_DownloadUrls ()
{
Parallel.ForEach(urls,
(url,loopstate,index) =>
{
WebClient webclient = new WebClient();
webclient.DownloadFile(url, filenames[index] + ".
dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
});
}
Этот код позволяет программе создавать более сотни веб-клиентов; программа выдаст исключение таймаута в WebClient. Вы понимаете, что на компьютере не установлена серверная операционная система, поэтому максимальное количество подключений ограничено.
Тогда вы можете догадаться, что использование Parallel.ForEach с локальным состоянием потока решит проблему:
public static void downloadUrlsSafe()
{
Parallel.ForEach(urls,
() => new WebClient(),
(url, loopstate, index, webclient) =>
{
webclient.DownloadFile(url, filenames[index]+".
dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
return webclient;
},
(webclient) => { });
}
}
В этой реализации каждая операция доступа к данным независима от другой.
В то же время точка доступа не является ни независимой, ни потокобезопасной.
Использование локального хранилища потоков позволяет нам быть уверенными, что количество созданных экземпляров Веб-клиент столько, сколько требуется, и каждый экземпляр Веб-клиент принадлежит потоку, который его создал.
Что не так с PLINQ?
Если мы реализуем предыдущий пример с использованием объектов ThreadLocal и PLINQ, код будет следующим:
public static void downloadUrl()
{
var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
var res =
urls
.
AsParallel()
.
ForAll(
url =>
{
webclient.Value.DownloadFile(url, host[url] +".
dat"));
Console.WriteLine("{0}:{1}",
Thread.CurrentThread.ManagedThreadId, url);
});
}
Хотя реализация достигает тех же целей, важно понимать, что в любом сценарии использование ThreadLocal<> существенно дороже соответствующей перегрузки Параллельный.
ForEach .
Обратите внимание, что в этом сценарии стоимость создания экземпляров ThreadLocal<> незначительно по сравнению со временем, необходимым для скачивания файла из Интернета.
Выход из операций
Применение Параллельный.
ForEach выйти из операции В ситуации, когда контроль над выполнением операций необходим, важно понимать, что выход из цикла Параллельный.
ForEach позволяет добиться того же эффекта, что и проверка условия необходимости продолжения вычислений внутри тела цикла.
Одна из перегрузок Параллельный.
ForEach позволяя вам отслеживать ParallelLoopState , выглядит так:
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState> body)
ParallelLoopState обеспечивает поддержку прерывания выполнения цикла двумя различными способами, описанными ниже.
ParallelLoopState.Stop()
Останавливаться() сообщает циклу о необходимости прекратить выполнение итераций; свойство ParallelLoopState.IsStopped позволяет каждой итерации определять, вызывается ли какая-либо другая итерация методом Останавливаться() .Метод Останавливаться() обычно полезно, если цикл выполняет поиск вне порядка и должен завершиться, как только искомый элемент будет найден.
Например, если мы хотим узнать, присутствует ли объект в коллекции, код будет примерно таким:
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
var matchFound = false;
Parallel.ForEach(TSpace,
(curValue, loopstate) =>
{
if (curValue.Equals(match) )
{
matchFound = true;
loopstate.Stop();
}
});
return matchFound;
}
Эту функциональность также можно реализовать с помощью PLINQ. В этом примере показано, как использовать ParallelLoopState.Stop() для управления потоком выполнения.
ParallelLoopState.Break()
Перерыв() сообщает циклу, что элементы, предшествующие текущему элементу, должны быть обработаны, но последующие элементы должны прекратить итерацию.Значение нижней итерации можно получить из свойства ParallelLoopState.LowestBreakIteration .
Перерыв() обычно полезно, если вы ищете упорядоченные данные.
Другими словами, существует определенный критерий необходимости обработки данных.
Например, для последовательности, содержащей неуникальные элементы, в которой вы хотите найти индекс соответствующего объекта, код будет выглядеть так:
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where
T: IEqualityComparer<T>
{
var loopResult = Parallel.ForEach(source,
(curValue, loopState, curIndex) =>
{
if (curValue.Equals(match))
{
loopState.Break();
}
});
var matchedIndex = loopResult.LowestBreakIteration;
return matchedIndex.HasValue ? matchedIndex : -1;
}
В данном примере цикл выполняется до тех пор, пока не будет найден объект, сигнал Break() означает, что обрабатываться должны только элементы с индексом ниже найденного объекта; если будет найден другой совпадающий экземпляр, сигнал Break() будет получен снова, это повторяется до тех пор, пока есть элементы, если объект был найден, поле LowestBreakIteration указывает на первый индекс совпадающего объекта.
Почему не PLINQ?
Хотя ПЛИНК обеспечивает поддержку выхода из выполнения запроса; различия в механизмах выхода между PLINQ и Parallel.ForEach значительны.Чтобы выйти из запроса PLINQ, запрос должен быть снабжен токеном отмены, как описано.
Здесь .
С Параллельный.
ForEach флаги выхода опрашиваются на каждой итерации.
В случае ПЛИНК Вы не можете рассчитывать на то, что отмененный запрос быстро остановится.
Заключение
Параллельный.ForEach И ПЛИНК — мощные инструменты для быстрого внедрения параллелизма в ваши приложения без необходимости глубокого погружения в механизмы их работы.
Однако, чтобы правильно выбрать инструмент для конкретной задачи, имейте в виду различия и советы, описанные в этой статье.
Полезные ссылки:
Потоковая обработка в C# RSDN: Работа с потоками на C#.Параллельное программирование
Образцы Microsoft для параллельного программирования с помощью .NET Framework
Теги: #PLINQ #Parallel.ForEach #Параллельное программирование #параллельные вычисления #параллелизм #.NET #.
NET
-
Дайджест Laravel (15–21 Июня 2020 Г.)
19 Oct, 24 -
Ssd И Hdd В Корпусе Одного Macbook Pro
19 Oct, 24 -
Канобувости, 56-Й Выпуск
19 Oct, 24 -
Интеграция Android Studio, Gradle И Ndk
19 Oct, 24