Идея написать такой модуль родилась из PLM нашего корпоративного продукта.
При просмотре нашей проектной документации нам сказали, что наш код ни при каких обстоятельствах не должен блокировать задачу, из которой он вызывается, и вообще занимать как можно меньше его времени; это особенность конструкции системы, сторожев и т. д. Поставленная задача предполагала передачу определенных сообщений от одной задачи к другой, что-то вроде логов, расширенной диагностики.
Приемная задача создается для записи результата в файл, так как заведомо из задачи-источника о записи в файл не может быть и речи.
И хотя источник (производитель) один, а потребитель (потребитель) тоже один, и даже наличие мьютексов или семафоров не повлияло бы на исходную задачу, от них было решено полностью отказаться.
Опять же, в будущем можно было расширить задачу до нескольких других задач, и поэтому ситуация, когда одна задача ждет другую, хотя и ограниченно допустима (а исходный код по-прежнему использует семафоры для обмена своими информационными сообщениями), очень нежелательна.
Изначально планировалось сделать статический кольцевой буфер, где каждый элемент содержит бит, определяющий принадлежность его источнику или потребителю.
Алгоритм предельно прост, источник записывает свои данные в ячейку, где бит равен нулю, а после этого «публикует» изменение, записывая единицу в этот бит и переходя к следующему элементу.
Потребитель из элемента, у которого этот бит равен единице, читает сообщение, а затем сбрасывает бит в ноль.
Никаких гонок, вроде все в порядке.
Но уже первый тест трафика показал, что за один срез источник теоретически может выдать около 30-40 тысяч элементов.
В реальности, конечно, будет меньше, так как помимо выдачи этих строк он делает еще кое-что, но определить размер буфера, которого было бы достаточно, не представляется возможным.
Одной из причин этого также является нестабильная скорость записи в файл — в некоторых системах вместо жестких дисков установлены CF-карты.
И мне бы очень не хотелось терять сообщения.
Порывшись в Интернете, я наткнулся на следующее решение, которое внедрил в свою задачу: drdobbs.com/architecture-and-design/210604448 Алгоритм описан достаточно подробно; Я не буду повторять это здесь.
Два изменения, которые я сделал: 1) Не понимаю, почему исходник выпускает элементы, а не потребитель.
Выпуск элементов потребителем также не создает состояния гонки (кстати, как перевести эту фразу на русский язык?).
Это снимает часть нагрузки с потребителя и снижает использование памяти, поскольку потребляемые элементы удаляются немедленно.
2) Также тест трафика, а точнее профилировщик выявил, что malloc — относительно дорогая операция.
Поскольку максимальный размер исходных сообщений известен, было решено сгруппировать выделение памяти одной операцией сразу на 8 элементов.
Это дало более чем двукратный прирост скорости, в частности вдвое снизило нагрузку на процессор, которую мы добавляем в исходную задачу.
Тут сразу надо оговориться, что исходный код был на неплюсовом языке C, который к тому же в силу соглашения о неразглашении я не могу публиковать.
Маллок больше не имеет отношения к до-диезу, поэтому второй пункт уже неприменим.
А в свободное время я изучаю до-диез и пишу лично для себя.
Однажды мне предложили работу, но я не прошел из-за отсутствия опыта работы с этим волшебным языком и с тех пор занимаюсь этим.
Ну да ладно, ближе к делу.
Реализация неблокирующей очереди в C#
Первым шагом является описание элемента очереди.
class queItem { public object message; public queItem next; public queItem(object message = null) { this.message = message; next = null; } }И сама очередь:
class locklessQueue //thread-to-thread lockless queue { queItem first; queItem last; }Здесь первый элемент принадлежит потребителю, а следующий, последний элемент принадлежит источнику.
Ни первый, ни последний не должны быть нулевыми, поэтому конструктор создает пустой элемент в состоянии «уже использован».
public locklessQueue() { first = new queItem(); last = first; }Далее идут методы добавления в очередь и соответственно извлечения из нее.
public void produce(object message) { last.next = new queItem(message); last = last.next; } public bool consume(out object message) { if (first == last || first.next == null) { message = null; return false; } message = first.next.message; first = first.next; return true; } }Сам по себе результирующий класс мало оправдан, поскольку Dotnet 4.0 уже включает в себя класс ConcurrentQueue, который не только полностью потокобезопасен, но и, в отличие от результирующего класса, позволяет одновременно добавлять и удалять из очереди несколько потоков.
И это позволяет работать с очередью в 1,5-3 раза быстрее по сравнению с вариантом блокировки.
проверочная ссылка Для сборщика журналов класса ConcurrentQueue более чем достаточно.
Однако я расширил задачу для своего dotnet-приложения, и ConcurrentQueue меня не устроил, поскольку он безадресный.
Обмен сообщениями между потоками
Каждый поток должен иметь возможность отправлять сообщение другому потоку по своему имени.
В моем случае это обработчик TCP-сокета (клиента или сервера) и собственно обработчики потоков.
Как узнать, какой именно обработчик следует отправить, выходит за рамки этой заметки.
К сожалению, мне так и не удалось решить одну из подзадач - добавление новой темы в качестве участника обмена без блокировки.
Хотелось бы увидеть исходный код ConcurrentQueue, возможно его решение поможет найти ответ. В Шарпе правда можно использовать его для передачи инициирующих сообщений, или для сообщений от асинхронных методов, но пока я приведу решение по блокировке, почти такое же, как оно было бы в классическом C. Забегая вперед, скажу, что принятое решение имеет очевидный недостаток: для обработки очередей нужно запускать отдельный поток, это расплата за отсутствие блокировки.
Отдельный поток, очевидно, добавит нагрузку на процессор, но отсутствие блокировки ускорит обработку каждого сообщения.
Насколько улучшится/ухудшится производительность и как она будет масштабироваться, оценить сложно, возможно, я проведу подобные исследования в будущем.
Итак, для каждого потока участников вам необходимо создать две очереди; он будет отправлять сообщения одному и читать другому.
«Прокси»-контейнер для этих двух очередей:
class threadNode { public string tName; public int tid; locklessQueue outgoing = new locklessQueue(); //from Messenger to Node locklessQueue incoming = new locklessQueue(); //from node to Messenger public threadNode(string tName, int tid) { this.tid = tid; this.tName = tName; } public void enqueue(messengerItem message) //called by Node { incoming.produce(message); } public bool dequeue(out messengerItem message) //called by Node { object msg; bool result = outgoing.consume(out msg); message = msg as messengerItem; return result; } public void transmit(messengerItem message) //called by Messenger { outgoing.produce(message); } public bool retrieve(out messengerItem message) //called by Messenger { object msg; bool result = incoming.consume(out msg); message = msg as messengerItem; return result; } }Как видите, в очередь помещается объект типа MessengerItem, представленный следующим классом:
class messengerItem { public string from; public string to; public object message; public messengerItem(string from, string to, object message) { this.from = from; this.to = to; this.message = message; } }Я сделал основной класс статическим, чтобы можно было отправлять сообщение из любого места кода, написав Messenger.send(.
);
public static class Messenger { static Dictionary<int, threadNode> byTID = new myDictionary<int, threadNode>(); static Dictionary<string, threadNode> byRegName = new myDictionary<string, threadNode>(); static Mutex regMutex = new Mutex(); //only one task is allowed to register at a timeДля поиска нужного узла при передаче сообщения из потока я использую Словарь с ключом ManagedThreadId, а ключом к потоку является имя, указанное при регистрации.
Для передачи сообщения от одного узла к другому я запускаю собственный поток, оболочка которого здесь не приведена; Короче говоря, он вызывает функцию MessengerFunction, описанную ниже, в бесконечном цикле и вызывает Thread.Sleep, если возвращаемое значение ложно, чтобы передать срез.
static bool messengerFunction() { bool acted = false; messengerItem item; threadNode dst; Dictionary<string, threadNode> tmp = byRegName; foreach (threadNode node in tmp.Values) { if (tmp != byRegName) break; if (node.retrieve(out item)) if (item != null) { acted = true; if(tmp.TryGetValue(item.to,out dst)) { dst.transmit(item); sent = true; } //else discard } } return acted; }Для регистрации потока в мессенджере используется следующая функция, которая на данный момент заблокирована:
static public void register(string tName) { if (tName == null || tName == "") return; int tid = Thread.CurrentThread.ManagedThreadId; myDictionary<int, threadNode> newbyTID = new myDictionary<int, threadNode>(); myDictionary<string, threadNode> newbyRegName = new myDictionary<string, threadNode>(); threadNode newnode = new threadNode(tName, tid); newbyTID.Add(tid, newnode); newbyRegName.Add(tName, newnode); regMutex.WaitOne(); foreach (threadNode node in byTID.Values) { newbyTID.Add(node.tid, node); newbyRegName.Add(node.tName, node); } byTID = newbyTID; byRegName = newbyRegName; regMutex.ReleaseMutex(); }Кроме того, аналогичная функция используется для отмены регистрации при завершении потока; Я опущу его код. Остаются только функции отправки и получения сообщений в потоках.
static public void send(string destination, object message) { int tid = Thread.CurrentThread.ManagedThreadId; threadNode node; if (byTID.TryGetValue(tid, out node)) node.enqueue(new messengerItem(node.tName, destination, message)); } static public bool receive(out object message, out string sender) { int tid = Thread.CurrentThread.ManagedThreadId; threadNode node; if (!byTID.TryGetValue(tid, out node)) { sender = null; message = null; return false; } else { messengerItem item; bool result = node.dequeue(out item); if (!result || item == null) { sender = null; message = null; } else { message = item.message; sender = item.from; } return result; } }Последняя функция используется потоками-обработчиками в цикле для обработки сообщения или, если оно не получено, для выполнения Thread.Sleep().
Теги: #.
NET #без блокировки #без блокировки #очередь #Алгоритмы
-
Мини-Справочник И Руководство По Scrum
19 Oct, 24 -
Api Google Translate Без Ajax
19 Oct, 24 -
Пасхальные Яйца
19 Oct, 24 -
Официальное Зеркало Ubuntu
19 Oct, 24 -
Найм Людей Для Devops И Других Плохих Идей
19 Oct, 24