Анатомия Асинхронных Фреймворков На C++ И Других Языках.

Привет! В этой статье я расскажу о проектировании асинхронных двигателей с сопрограммами и без них.

Для начала давайте сосредоточимся не на конкретном движке, а на том, почему сопрограммы появились во всех популярных языках программирования и чем они так хороши.

Это может быть интересно не только разработчикам C++, но и всем, кто разрабатывает сетевые приложения или интересуется архитектурой современных фреймворков.

Пройдемся по разным архитектурам построения серверов — от самых простых синхронных до более интересных, посмотрим на типовую архитектуру движка сопрограмм, а затем окунемся в дебри C++ и посмотрим на самое худшее на примере нашего фреймворка userver. .



Написание синхронного сервера

Представьте, что у вашего сервиса очень маленькая нагрузка — 100 rps, и вам поставлена задача написать простой сервер, понятный каждому второму школьнику.

В итоге у вас получится что-то вроде следующего:

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
   

void naive_accept() { for (;;) { auto new_socket = accept(listener); std::thread thrd([socket = std::move(new_socket)] { auto data = socket.receive(); process(data); socket.send(data); }); thrd.detach(); } }

Сервер принимает новые соединения в бесконечном цикле, используя функцию принятия.

Как только у нас появляется новое сокет-соединение, мы переносим его в отдельный поток выполнения и работаем с ним в этом потоке.

Мы читаем данные из сокета, обрабатываем их и отправляем ответ обратно через сокет. Все очень просто.

Как выглядит такой сервер для операционной системы (ОС)?

Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Звоним принять.

Accept — это системный вызов, то есть функция, которая перейдет в операционную систему, и операционная система выполнит необходимые действия, в этом случае она вернет новое соединение.

Но нового соединения может и не быть, если пользователи нашего серверного приложения еще не сделали к нему запрос.

В этом случае ОС приостановит работу приложения, переключит контекст на другой, и ядро процессора будет работать с другой программой.

В какой-то момент появится новое соединение, операционная система это заметит и вернет управление нашему приложению.

Программа продолжит работать как ни в чем не бывало.

Перейдем к следующей строке кода.

Там создается std∷thread, куда мы передаем сокет. За вызовом std∷thread также стоит системный вызов, который во многих операционных системах является довольно тяжелым.

Мы получаем новый поток выполнения.

После этого все продолжается в бесконечном цикле: мы снова вызываем Accept, заходим в операционную систему, делаем системный вызов, и ОС приостанавливает наш поток.



Что делает новый поток?

Что происходит с потоком, получившим новое соединение и обрабатывающим его? Он также выполняет системный вызов приема.

То есть мы заходим в систему и говорим: «Ой, операционная система, дай нам данные!» — и ОС отвечает: «Ой, на сокете нет данных, пусть поток спит, пока данные не появятся».



Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

При появлении данных ОС переключает выполнение обратно на наше приложение, оно обрабатывает данные и отправляет их через ОС через сокет пользователю.

После этого поток уничтожается – свое дело он сделал.



Плюсы и минусы наивного подхода

Плюсы описанного подхода очевидны — получается очень простой сервер.

Легко писать, легко читать, легко понимать.

Но есть и недостатки, например, сервер очень неэффективен, потому что мы делаем много тяжелых операций, которые нельзя было бы сделать.

В таблице показано, сколько времени занимает каждая операция:

Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Самые «дешевые» операции находятся наверху.

Например, перемещение данных из регистра в регистр занимает менее одного такта.

А самые «дорогие» операции находятся внизу.

И среди этих операций есть системный вызов, занимающий 1000–1500 тактов.

А в самом-самом низу есть системный вызов, который приводит к переключению контекста.

Такое переключение с возвратом обратно в приложение занимает от 10 тысяч циклов до миллиона.



Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Если наше приложение в основном занимается получением данных и их отправкой куда-то, то есть это приложение, привязанное к вводу-выводу, количество переключений контекста может быть очень большим.

Если избавиться от них, приложение будет работать в 10, 20, 30, а то и 100 раз быстрее.



Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Еще одним недостатком этой архитектуры является то, что мы создаем новый поток для каждого запроса пользователя.

Для некоторых приложений это может быть неприемлемо.

Например, приложения Python обычно являются однопоточными.

И создать в них новый поток – задача весьма уникальная.

Поэтому такая архитектура не подходит для Python. Создание потока — дорогостоящая и сложная операция.

Если бы мы повторно использовали потоки, мы бы получили дополнительный прирост производительности, а если бы мы работали в одном потоке, наша архитектура подошла бы для Python.

Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>



Написание асинхронного сервера

Давайте представим, что нагрузка возросла настолько, что самый простой сервер нам уже не подходит. Нам нужно что-то более производительное — асинхронный сервер! В чем его разница? В синхронном сервере мы говорили: «Операционная система, дайте нам новый сокет», и пока не появился новый сокет, операционная система приостанавливала поток.

Оно возобновилось, когда событие произошло — когда мы получили новое соединение.

А в асинхронном сервере мы запрашиваем у операционной системы события, которые уже произошли.

Мы можем сразу с ними работать — выполнять любую функцию или обратный вызов, связанный с этим событием.

Берем то, что уже готово, тем самым избавляясь от переключений контекста.

В псевдокоде это выглядит примерно так:

void async_accept() { accept(listener, [](socket_t socket) { async_accept(); socket.receive( [socket](std::vector<unsigned char> data) { process(data); socket.send(data, kNoCallback); }); }); }

Есть функция async_accept, и в ней самой первой строкой мы называем асинхронное принятие нового соединения — Accept. В этот момент мы говорим ОС: «ОС, начни отслеживать событие получения новых сокетов на этом слушателе сокетов».

Когда происходит событие, ОС сообщает об этом фреймворку, внутренние механизмы фреймворка вызовут обратный вызов `[](socket_tocket)` и передадут ему новый сокет. Асинхронный прием обрабатывает сразу, то есть мы не ждем появления сокета.

Мы сказали операционной системе следить за новыми событиями, и вызов сработал сразу — мы вышли из функции async_accept. Но когда сокет появляется, ОС сообщает об этом нашему движку, и движок вызывает обратный вызов.

Внутри обратного вызова снова вызывается async_accept. Мы как бы говорим операционной системе: «Продолжайте следить за новыми соединениями».

А еще мы говорим: «За новой розеткой тоже следите.

Когда там появятся данные, вызовите этот обратный вызов `[socket](std::vector data)` и передайте в него вектор с данными.

А когда данные появятся, ОС передаст их обратному вызову.

Там эти данные обрабатывается и отправляется наружу через Socket.send. Как это взаимодействие выглядит на диаграмме?

Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

В самом начале мы вызываем Accept и говорим системе: «Следите за новыми событиями».

И через какое-то время движок заходит в ОС и спрашивает: «Ну и что? Какие события произошли? А операционная система, например, может ответить: «Есть новое соединение».

Двигатель отвечает: «Чудесно! Выполняем обратный вызов» — и обратный вызов сначала просит ОС отслеживать новые соединения.

Дальше работа продолжается, мы вызываем получение для получения новых данных.

Мы говорим операционной системе: «Отслеживать новые события».

Всё, движку больше делать пока нечего.

Через некоторое время движок снова спрашивает: «Операционная система, произошли ли какие-то новые событияЭ» Есть ли что-нибудь, над чем я могу работатьЭ» А ОС может ответить: «Да, здесь сразу несколько.

Во-первых, запрошенные вами данные на сокете поступили, во-вторых, появилось новое соединение».

В этом случае асинхронный движок выполнит три обратных вызова: тот, который обрабатывает данные; тот, который будет отправлять данные; и обратный вызов, связанный с обработкой нового соединения.

Это уже второе подключение.

То есть мы обрабатываем два соединения и принимаем в рамках одного потока.

Это отличная архитектура, подходящая для Python. В чем проблема такого подхода, чего нам не хватает? Все просто — не хватает возможности запускать пользовательские задачи параллельно с выполняемым в данный момент кодом.



Запуск пользовательских задач



socket.receive([socket](std::vector<unsigned char> data) { auto task = Async(process1, data); process(data); task.wait(); socket.send(data, kNoCallback); });

Пользователи могут захотеть запустить обработку в отдельной задаче параллельно с основной задачей, а затем дождаться завершения обоих процессов.

Проблема в том, что операционная система ничего не знает о наших подзадачах.

Они находятся внутри движка, а это значит, что нам нужно научить операционную систему, что у нас есть какие-то свои подзадачи.

В этом случае вам придется общаться с ОС посредством системных вызовов, а это не очень эффективно.

Другой вариант — создать очередь задач, готовых к выполнению.

В этом случае асинхронный двигатель будет выглядеть так:

Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

В движке имеется пул потоков, в котором хранится очередь готовых к выполнению задач.

Пул потоков получает задачи от движка, который собирает события из операционной системы и добавляет в очередь задач готовые к выполнению обратные вызовы.

Пул потоков сам извлекает задачи из очереди и рассчитывает их.

Во время расчета задача может сказать операционной системе: «Начни отслеживать еще какие-то события».

Задача может порождать другие задачи, готовые к выполнению.

И в этом случае в очередь будут помещены и новые задачи.

Пул потоков может состоять из одного потока или нескольких, в зависимости от того, как вы настроили движок или как он был написан.

Но есть нюанс.

Эта схема хорошо работает в идеальном мире, где в ОС есть полный набор асинхронных методов для всех наших желаний.

К сожалению, мы живем не в самом идеальном мире, и в операционных системах не всегда есть асинхронные возможности необходимых системных вызовов.

Например, не каждая операционная система может просматривать содержимое каталога и асинхронно читать оттуда файл.

Поэтому во многих случаях создается отдельный пул потоков для блокировки задач, которые не могут выполняться асинхронно в ОС.

Он нужен для того, чтобы потоки из основного пула потоков не зависали на блокировке системных вызовов, чтобы основной пул потоков занимался только задачами, связанными с CPU, то есть максимально эффективно нагружал процессор.



Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Из очереди блокирующих задач готовые к выполнению обратные вызовы могут перейти в очередь задач, привязанных к ЦП, а задачи из пула, привязанных к ЦП, могут быть добавлены в очередь задач блокировки.

То есть все очереди могут общаться друг с другом и обмениваться задачами.



А как насчет синхронизации?

И всё бы ничего, но двух очередей недостаточно для сценариев с одновременным доступом к одним и тем же данным из разных потоков.

Для правильной работы приложения необходимо защитить общие данные с помощью примитивов синхронизации, таких как мьютексы.

Опять же существует риск блокировки потока.



Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Например, если вызвать метод lock на уже заблокированном стандартном мьютексе, произойдет системный вызов, управление перейдет в ОС, и система поймет: мьютекс заблокирован, потоку делать нечего, он должен приостановиться, переключить контекст и работать с другим приложением.

Но когда мьютекс разблокирован, поток необходимо будет разбудить и сообщить, что мьютекс успешно заблокирован.

Минус в том, что поток на какое-то время зависает и ничего не делает, а это не очень хорошо для асинхронного движка.

Приложение простаивает. Чтобы этого не произошло, вы можете написать свои собственные мьютексы с обратными вызовами.



mutex.lock([data = std::move(data), socket = std::move(socket)]() { process2(shared_resource, data); socket.send(data, kNoCallback); });

Например, мы можем создать мьютекс с функцией блокировки, в которую мы передаем обратный вызов, который должен быть выполнен при успешном получении мьютекса.

Под капотом этот замок может выглядеть примерно так:

template <class Functor> void lock(Functor f) { auto lock = this->try_lock(); if (lock) { f(); } else { wait_for_unlock(std::move(f)); } }

Когда вызывается lock(), мы пытаемся получить мьютекс.

Если он успешно перехвачен, обратный вызов выполняется немедленно.

Если обратный вызов не выполняется, его необходимо поместить в очередь задач, где он будет ждать освобождения мьютекса.

Так в схему асинхронного двигателя добавляется еще один блок — приостановленные задачи, которые чего-то ждут.

Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Задачи могут отправляться туда из пула потоков и возвращаться в очередь готовых к выполнению задач, когда их можно будет разблокировать.

В блок также могут быть включены задачи из других пулов потоков, например, из потока блокирующих операций.

Главное, чтобы в нужный момент задача вернулась обратно в очередь задач нужного пула потоков, готовых к выполнению.



Плюсы и минусы асинхронного сервера без сопрограмм

Вот так у нас получился асинхронный сервер без сопрограмм.

Его преимущества очевидны – все очень эффективно.

Мы можем обрабатывать несколько соединений в одном потоке, нет лишних переключений контекста, а ресурс ЦП потребляется минимально.

Проблема в том, что со временем код становится нечитаемым.



void async_accept() { accept(listener, [](socket_t socket) { async_accept(); auto something = Async(process1, {42}); auto& socket_ref = *socket; socket_ref.receive( [socket = std::move(socket), something = std::move(something)](std::vector<unsigned char> data) mutable { auto task = Async(process1, data); process(data); task.wait(); auto& socket_ref = *socket; socket_ref.send(data, [data, socket = std::move(socket), something = std::move(something)]() mutable { mutex.lock([data = std::move(data), socket = std::move(socket), something = std::move(something)]() mutable { process2(shared_resource, data); socket->send(data, kNoCallback); }); }); }); }); }

Когда пример небольшой, простой и мы не передаем много данных между обратными вызовами, все читается более-менее нормально.

Но когда объем данных увеличивается и между обратными вызовами необходимо передавать несколько переменных, все становится намного сложнее.

Вы должны внимательно следить за временем жизни переменных, правильно и эффективно передавать их в обратные вызовы и не забывать их перемещать.

Во многих местах нельзя передавать переменные по ссылке, потому что создается висячая ссылка — код тоже становится опасным.

Написание кода становится сложнее, как и его читабельность, работать с ним можно, но неприятно (конечно, если вы не из тех, кто это любит).

Здесь нам на помощь приходят сопрограммы.

Давайте рассмотрим третий тип архитектуры — асинхронный движок с сопрограммами.



Асинхронный сервер с сопрограммами



coro_future coro_accept_stackles() { for (;;) { auto new_socket = co_await accept(listener); auto task = Async([socket = std::move(new_socket)]() -> coro_future { auto data = co_await socket.receive(); process(data); co_await socket.send(data); co_return; }); task.Detach(); } }

Как работает эта архитектура? В самом начале мы принимаем сокет. При этом рядом пишем ключевое слово co_await для сопрограмм.

Сокет приняли, создаем новую задачу.

Внутри задачи мы получаем данные из сокета, обрабатываем их и отправляем результат наружу.

При этом все крутится в бесконечном цикле и код становится очень похож на самый первый синхронный сервер, который мы написали так, чтобы было понятно даже школьникам — почти буква в букву.

Разница лишь в том, что добавлены ключевые слова для работы со стековыми сопрограммами: co_await, co_return и т.д. Но можно ли сделать код еще более похожим на простейший синхронный сервер? Может! Для этого нужно использовать не стековые, а стековые сопрограммы.

Таким образом, код будет точно таким же, как у синхронного сервера.

Забегая вперед, скажу, что эффективность останется на асинхронном уровне.

Асинхронный сервер с сопрограммами и синхронный:



void coro_accept_stackfull() { for (;;) { auto new_socket = accept(listener); auto task = Async(/*.

*/ { auto data = socket.receive(); process(data); socket.send(data); }); task.Detach(); } }



void naive_accept() { for (;;) { auto new_socket = accept(listener); std::thread thrd(/*.

*/ { auto data = socket.receive(); process(data); socket.send(data); }); thrd.detach(); } }

Однако при переходе на стековые сопрограммы появляются небольшие накладные расходы: увеличивается потребление оперативной памяти, отмены происходят немного медленнее.

Здесь каждый выбирает сам, что ему ближе — код, более похожий на синхронный и более простой, или наименьшее использование оперативной памяти.



Структура движка сопрограммы

Давайте посмотрим, как выглядит движок сопрограммы под капотом.



Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Схема получается такая же, как у асинхронного движка без сопрограмм.

Все дело в том, что сопрограммы — это почти обратный вызов.



coro_future coro_accept_stackles() { for (;;) { auto new_socket = co_await accept(listener); auto task = Async([socket = std::move(new_socket)]() -> coro_future { auto data = co_await socket.receive(); process(data); co_await socket.send(data); co_return; }); task.Detach(); } }

Когда мы пишем co_await, компилятор просматривает все, что находится рядом в коде, и из этого хитрым способом делает обратный вызов, который передает в функцию принятия.

Когда компилятор видит co_await внутри отдельной задачи, он собирает близлежащие переменные, делает из них обратный вызов и подсовывает его в прием.

Это происходит в случае сопрограмм без стека.

В случае сопрограмм, заполненных стеком, мы как бы превращаем весь наш стек в своего рода обратный вызов.

Все, что находится в стеке, превращается в отдельную задачу, которая переносится в очередь, откуда ее можно запустить и передать асинхронным методам.

С сопрограммами и асинхронностью у нас все работает очень эффективно, как и было в случае с обычным асинхронным сервером с обратным вызовом, а код остается простым и читабельным.

Но есть и недостатки.

Под капотом такого асинхронного сервера с сопрограммами происходит очень многое.

Причем это такая же жесткость, как если бы мы писали просто асинхронный сервер.

А если вы опытный разработчик, то эта жесть может вам даже понравиться :-) Так что если вы разрабатываете эту жесть и получаете от нее удовольствие, то минус может обернуться для вас плюсом.



С++ хардкор

А теперь, как я и обещал, обратимся к конкретному примеру.

Посмотрим, как с этой жестью под капотом работает группа общих компонентов в Яндексе.

У нас есть собственный асинхронный фреймворк под названием userver. Я покажу вам, как мы реализовали в нем неблокирующий асинхронный мьютекс с помощью сопрограмм и асинхронного программирования.



Анатомия асинхронных фреймворков на C++ и других языках.
</p><p>

Архитектура пользовательского сервера аналогична большинству асинхронных фреймворков.

Правда, у нас также есть асинхронные драйверы для различных баз данных (PostgreSQL, MongoDB, Redis), сокетов, DNS, http-протоколов и не-http-протоколов.

В общем, почти для всего у нас есть свои асинхронные драйверы и примитивы асинхронной синхронизации.

Давайте создадим наш собственный мьютекс.

Асинхронный, эффективный, подходит для асинхронного движка сопрограммы:

struct Mutex { void lock(); void unlock(); private: std::atomic<Coroutine*> owner_{nullptr}; };

Userver использует сопрограммы со стеком, поэтому дополнительные ключевые слова не нужны, и мы можем создать мьютекс, интерфейс которого такой же, как у std∷mutex. То есть вы можете использовать std::unique_lock с нашим мьютексом.

Как ни удивительно, у мьютекса есть методы блокировки и разблокировки, и чтобы это работало, нам нужна атомарная переменная «owner_» внутри класса мьютекса.

В этой переменной мы будем хранить указатель на сопрограмму, которая в данный момент владеет мьютексом.

Если мьютексом никто не владеет, эта переменная сохраняет значение nullptr. Метод блокировки будет выглядеть так:

void Mutex::lock() { Coroutine* current = GetCurrentCoro(); Coroutine* expected = nullptr; if (owner_.compare_exchange_strong(expected, current)) return; expected = nullptr; impl::MutexWaitStrategy wait_manager(lock_waiters_, current); while (!owner_.compare_exchange_strong(expected, current)) { assert(expected != current && "Mutex is locked twice from the same task"); current->Sleep(wait_manager); expected = nullptr; } }

В самом начале мы получаем указатель на текущую сопрограмму.

Это сопрограмма, которая в данный момент выполняет метод lock().

Ожидается, что мьютекс в данный момент никем не заблокирован, и мы атомарно пытаемся это проверить.

Говорим: «Если мьютексом в данный момент никто не владеет (то есть атомарная переменная содержит nullptr), прописываем туда нашу сопрограмму в качестве владельца.

И скажи мне, удалось ли тебе это сделать или нет».

Если все получилось и мьютекс захвачен, больше ничего не нужно, выходим из функции блокировки.

Если нам не повезло и мьютексом уже владеет другая сопрограмма, спускаемся вниз, устанавливаем служебную переменную wait_manager, о которой я расскажу чуть позже, и пытаемся повторить процедуру с захватом мьютекса.

Если на этот раз мьютекс разблокирован, мы выходим из функции блокировки и уничтожаем локальные переменные.

Если мьютекс все же захвачен, нужно дождаться, пока он разблокируется, а сопрограмму следует приостановить и переместить в очередь приостановленных задач, не готовых к выполнению.

Как это сделать?

class Coroutine { public: // .

void Sleep(WaitStrategy& strategy); // .

};

Внутри сопрограммы есть метод Sleep. Как ни странно, это приостанавливает выполнение сопрограммы.

Он прекращает выполнение до тех пор, пока не будет вызван в методе Wakeup. Sleep передается WaitStrategy, базовому классу, имеющему только две виртуальные функции.



class Coroutine { public: // .

void Sleep(WaitStrategy& strategy); // .

}; class WaitStrategy { public: virtual void AfterAsleep() = 0; virtual void BeforeAwake() = 0; protected: ~WaitStrategy() = default; };

1. AfterAsleep вызывается сразу после остановки сопрограммы.

Этот метод обычно запускает механизмы, способные пробудить кортуину.

2. BeforeAwake вызывается перед пробуждением сопрограммы.

В этом методе останавливаются все механизмы пробуждения сопрограммы.



struct Mutex { void lock(); void unlock(); private: std::atomic<Coroutine*> owner_{nullptr}; WaitList lock_waiters_; };

Мы также добавляем переменную WaitList в класс мьютекса.

Это очередь сопрограмм, ожидающих разблокировки этого мьютекса.

Если бы мы работали со стандартным мьютексом ОС, такая очередь хранилась бы где-то внутри системы.

Но так как мы используем свои примитивы синхронизации, то обращаться к операционной системе нет необходимости.

Мы можем хранить всю информацию при себе, например, хранить информацию о том, кто ожидает мьютекс, в самом мьютексе.

Что происходит в это время в функции блокировки? Мы создаем переменную класса MutexWaitStrategy, которая наследуется от WaitStrategy. Передаем созданную переменную методу Sleep. И это место — мое любимое во всех движках сопрограмм, потому что здесь мы можем использовать стек вместо кучи.



void Mutex::lock() { Coroutine* current = GetCurrentCoro(); Coroutine* expected = nullptr; if (owner_.compare_exchange_strong(expected, current)) return; expected = nullptr; impl::MutexWaitStrategy wait_manager(lock_waiters_, current); while (!owner_.compare_exchange_strong(expected, current)) { assert(expected != current && "Mutex is locked twice from the same task"); current->Sleep(wait_manager); expected = nullptr; } }

Например, где-то глубоко внутри движка необходимо создать одно- или двусвязный список и поместить в этот двусвязный список сопрограмму, которая теперь должна приостановиться.

Вы можете использовать стандартный контейнер, но в этом случае внутри движка будут происходить медленные динамические выделения при вставке новых элементов.



void Mutex::lock() { Coroutine* current = GetCurrentCoro(); Coroutine* expected = nullptr; if (owner_.compare_exchange_strong(expected, current)) return; expected = nullptr; impl::ListNode<Coroutine*> node{current}; while (!owner_.compare_exchange_strong(expected, current)) { assert(expected != current && "Mutex is locked twice from the same task"); current->Sleep(wait_manager); expected = nullptr; } }

Или можно использовать навязчивые списки и поместить узел этого листа в стек сопрограммы, которую мы собираемся приостановить.

Когда сопрограмма приостановлена, этот стек все еще можно использовать, вы даже можете изменить сопрограмму, потому что она приостановлена, но не уничтожена.

Благодаря этому мы можем создать все, что нужно движку для работы, прямо в стеке и полностью избежать динамического распределения внутри движка.

Давайте двигаться дальше.

Мы создаем вспомогательную переменную MutexWaitStrategy и в конструкторе этой переменной захватываем блокировку, защищающую WaitList, то есть список сопрограмм, ожидающих мьютекса.



class MutexWaitStrategy final : public WaitStrategy { public: MutexWaitStrategy(WaitList& waiters, Coroutine* current) : WaitStrategy(), waiters_(waiters), current_(current), lock_(waiters) {} void AfterAsleep() override { waiters_.Append(lock_, current_); lock_.unlock(); } void BeforeAwake() override { lock_.lock(); waiters_.Remove(lock_, current_); } private: WaitList& waiters_; Coroutine* const current_; WaitList::Lock lock_; };

Эта блокировка также поможет не пропустить уведомление о том, что мьютекс разблокирован.



void Mutex::lock() { Coroutine* current = GetCurrentCoro(); Coroutine* expected = nullptr; if (owner_.compare_exchange_strong(expected, current)) return; expected = nullptr; impl::MutexWaitStrategy wait_manager(lock_waiters_, current); while (!owner_.compare_exchange_strong(expected, current)) { assert(expected != current && "Mutex is locked twice from the same task"); current->Sleep(wait_manager); expected = nullptr; } }

Создаем Wait_manager, делаем Compare_exchange_strong. Не повезло — мьютекс все еще захвачен, переходим к методу Sleep. Внутри Sleep сопрограмма будет остановлена, затем будет выполнен метод AfterAsleep из MutexWaitStrategy. В этом методе мы добавим нашу сопрограмму в список ожидания мьютекса.



class MutexWaitStrategy final : public WaitStrategy { public: MutexWaitStrategy(WaitList& waiters, Coroutine* current) : WaitStrategy(), waiters_(waiters), current_(current), lock_(waiters) {} void AfterAsleep() override { waiters_.Append(lock_, current_); lock_.unlock(); } void BeforeAwake() override { lock_.lock(); waiters_.Remove(lock_, current_); } private: WaitList& waiters_; Coroutine* const current_; WaitList::Lock lock_; };

Разблокируем блокировку, после чего сопрограмма удаляется из потока и ждет, пока мьютекс не разблокируется.

В это время движок подхватывает другую задачу, готовую к выполнению, и начинает ее выполнять, поэтому поток не простаивает. Затем сопрограмма, которая ранее получила мьютекс, вызывает разблокировку:

void Mutex::unlock() { [[maybe_unused]] const auto old_owner = owner_.exchange(nullptr); assert(old_owner == GetCurrentCoro()); WaitList::Lock lock(lock_waiters_); lock_waiters_.WakeupOne(lock); }

В анлоке написано, что мьютексом больше никто не владеет, владелец опять принимает значение nullptr. Проводится проверка — убеждаемся, что мьютекс разблокирован той же сопрограммой, которая его заблокировала.

После этого для WaitList устанавливается блокировка, и мы вызываем метод «пробуждения той сопрограммы, которая ждала дольше всех» в WaitList.

class MutexWaitStrategy final : public WaitStrategy { public: MutexWaitStrategy(WaitList& waiters, Coroutine* current) : WaitStrategy(), waiters_(waiters), current_(current), lock_(waiters) {} void AfterAsleep() override { waiters_.Append(lock_, current_); lock_.unlock(); } void BeforeAwake() override { lock_.lock(); waiters_.Remove(lock_, current_); } private: WaitList& waiters_; Coroutine* const current_; WaitList::Lock lock_; };

WaitList пробуждает сопрограмму и вызывает метод MutexWaitStrategy::BeforeAwa. Теги: #python #Go #C++ #async/await #python3 #coroutine #c++17 #events #async #asynchronous #c++20 #coroutines #программирование, управляемое событиями

Вместе с данным постом часто просматривают: