Асинхронный Python: различные формы конкурентности

  • 2233
  • 1
  • 31 мая, 2022
  • читать 20 мин

Оглавление

Это перевод статьи Абу Ашраф Маснуна «Async Python: The Different Forms of Concurrency».

С появлением Python 3 довольно много шума об «асинхронности» и «параллелизме», можно полагать, что Python недавно представил эти возможности/концепции. Но это не так. Мы много раз использовали эти операции. Кроме того, новички могут подумать, что asyncio является единственным или лучшим способом воссоздать и использовать асинхронные/параллельные операции. В этой статье мы рассмотрим различные способы достижения параллелизма, их преимущества и недостатки.

Определение терминов:

Прежде чем мы углубимся в технические аспекты, важно иметь некоторое базовое понимание терминов, часто используемых в этом контексте.

Синхронный и асинхронный:

В ​синхронных операциях задачи выполняются друг за другом. В асинхронных — задачи могут запускаться и завершаться независимо друг от друга. Одна асинхронная задача может запускаться и продолжать выполняться, пока выполнение переходит к новой задаче. Асинхронные задачи ​не блокируют (не заставляют ждать завершения выполнения задачи) операции и обычно выполняются в фоновом режиме.

Например, вы должны обратиться в туристическое агентство, чтобы спланировать свой следующий отпуск. Вам нужно отправить письмо своему руководителю, прежде чем улететь. В синхронном режиме, вы сначала позвоните в туристическое агентство, и если вас попросят подождать, то вы будете ждать, пока вам не ответят. Затем вы начнёте писать письмо руководителю. Таким образом, вы выполняете задачи последовательно, одна за другой.

Но, если вы умны, то пока вас попросили подождать, вы начнёте писать письмо, и когда с вами снова заговорят, вы приостановите написание, поговорите, а затем допишете письмо. Вы также можете попросить друга позвонить в агентство, а сами написать письмо. Это асинхронность, задачи не блокируют друг друга.

Конкурентность и параллелизм:

Конкурентность подразумевает, что две задачи выполняются совместно. В нашем предыдущем примере, когда мы рассматривали асинхронный пример, мы постепенно продвигались то в написании письма, то в разговоре с турагентством. Это ​конкурентность.

Когда мы попросили позвонить друга, а сами писали письмо, то задачи выполнялись ​параллельно.​

Параллелизм по сути является формой конкурентности. Но параллелизм зависит от оборудования. Например, если в CPU только одно ядро, то две задачи не могут выполняться параллельно. Они просто делят процессорное время между собой. Тогда это конкурентность, но не параллелизм. Но когда у нас есть несколько ядер, мы можем выполнять несколько операций (в зависимости от количества ядер) одновременно.

Подытожим:

  • Синхронность: блокирует операции (блокирующие)
  • Асинхронность: не блокирует операции (неблокирующие)
  • Конкурентность: совместный прогресс (совместные)
  • Параллелизм: параллельный прогресс (параллельные)

Параллелизм подразумевает конкурентность. Но конкурентность не всегда подразумевает параллелизм.

Потоки и процессы

Python поддерживает потоки уже очень давно. Потоки позволяют выполнять операции конкурентно. Но есть проблема, связанная с Global Interpreter Lock (GIL) из-за которой потоки не могли обеспечить настоящий параллелизм. И тем не менее, с появлением multiprocessing можно использовать несколько ядер с помощью Python.

Потоки (Threads)

Рассмотрим небольшой пример. В нижеследующем коде функция worker будет выполняться в нескольких потоках асинхронно и одновременно.

import threading
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.start()

print("All Threads are queued, let's see when they finish!")

А вот пример выходных данных:

$ python thread_test.py
All Threads are queued, let's see when they finish!
I am Worker 1, I slept for 1 seconds
I am Worker 3, I slept for 4 seconds
I am Worker 4, I slept for 5 seconds
I am Worker 2, I slept for 7 seconds
I am Worker 0, I slept for 9 seconds

Таким образом мы запустили 5 потоков для совместной работы, и после их старта (т.е. после запуска функции worker) операция не ждёт завершения работы потоков прежде чем перейти к следующему оператору print. Это асинхронная операция.

В нашем примере мы передали функцию в конструктор Thread. Если бы мы хотели, то могли бы реализовать подкласс с методом (ООП стиль).

Global Interpreter Lock (GIL)

GIL нужен, чтобы сделать обработку памяти CPython проще и обеспечить наилучшую интеграцию с C.

GIL — это механизм блокировки, когда интерпретатор Python запускает в работу только один поток за раз. Это значит, только один поток может исполняться в байт-коде Python единовременно. GIL следит за тем, чтобы несколько потоков не выполнялись параллельно.

Краткие сведения о GIL:

  • Одновременно может выполняться один поток.
  • Интерпретатор Python переключается между потоками для достижения конкурентности.
  • GIL применим к CPython (стандартной реализации). Но, например, Jython и IronPython не имеют GIL.
  • GIL делает однопоточные программы быстрыми.
  • Операциям ввода/вывода GIL обычно не мешает.
  • GIL позволяет легко интегрировать непотокобезопасные библиотеки на C, благодаря GIL у нас есть много высокопроизводительных расширений/модулей, написанных на C.
  • Для CPU-зависимых задач интерпретатор делает проверку каждые N тиков и переключает потоки. Таким образом один поток не блокирует другие.

Многие видят в GIL слабость. Я же рассматриваю это как благо, ведь были созданы такие библиотеки, как NumPy, SciPy, которые занимают особое, уникальное положение в научном обществе.

Процессы (Processes)

Чтобы достичь параллелизма, в Python был добавлен модуль multiprocessing, который предоставляет API и выглядит очень похожим, если вы использовали threading раньше.

Давайте просто пойдем и изменим предыдущий пример. Теперь модифицированная версия использует Процесс вместо Потока.

import multiprocessing
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = multiprocessing.Process(target=worker, args=(i,))
    t.start()

print("All Processes are queued, let's see when they finish!")

Что же изменилось? Я просто импортировал модуль multiprocessing вместо threading. А затем, вместо потока я использовал процесс. Вот и всё! Теперь вместо множества потоков мы используем процессы, которые запускаются на разных ядрах CPU (если, конечно, у вашего процессора несколько ядер).

С помощью класса Pool мы также можем распределить выполнение одной функции между несколькими процессами для разных входных значений.

Пример из официальных документов:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

Здесь вместо того, чтобы перебирать список значений и вызывать функцию f по одному, мы фактически запускаем функцию в разных процессах.

Один процесс выполняет f(1), другой-f(2), а другой-f (3). Наконец, результаты снова объединяются в список. Это позволяет нам разбить тяжелые вычисления на более мелкие части и запускать их параллельно для более быстрого расчета.

Модуль concurrent.futures

Модуль concurrent.futures большой и позволяет писать асинхронный код очень легко. Мои любимчики — ThreadPoolExecutor и ProcessPoolExecutor. Эти исполнители поддерживают пул потоков или процессов. Мы отправляем наши задачи в пул, и он запускает задачи в доступном потоке / процессе. Возвращается объект Future, который можно использовать для запроса и получения результата по завершении задачи.

А вот пример ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())

Asyncio — что, как и почему

У вас, вероятно, есть вопрос, который есть у многих людей в сообществе Python — что asyncio приносит нового? Зачем нужен был еще один способ асинхронного ввода-вывода? Разве у нас уже не было потоков и процессов?

Зачем нам нужен asyncio?

Процессы очень дорогостоящие и требуют много ресурсов для создания. Поэтому для операций ввода/вывода в основном выбираются потоки.

Мы знаем, что ввод-вывод зависит от внешних вещей — медленные диски или неприятные сетевые лаги делают ввод-вывод часто непредсказуемым. Теперь предположим, что мы используем потоки для операций ввода-вывода. 3 потока выполняют различные задачи ввода-вывода. Интерпретатор должен был бы переключаться между конкурентными потоками и давать каждому из них некоторое время по очереди.

Назовем потоки — T1, T2 и T3. Три потока начали свою операцию ввода-вывода. T3 завершает его первым. T2 и T1 все еще ожидают ввода-вывода. Интерпретатор Python переключается на T1, но он все еще ждет. Хорошо, интерпретатор перемещается в T2, а тот все еще ждет, а затем перемещается в T3, который готов и выполняет код. Вы видите в этом проблему?

T3 был готов, но интерпретатор сначала переключился между T2 и T1 — это понесло расходы на переключение, которых мы могли бы избежать, если бы интерпретатор сначала переключился на T3, верно?

Что есть asynio?

Asyncio предоставляет нам цикл событий наряду с другими крутыми вещами. Цикл событий (event loop) отслеживает события ввода/вывода и переключает задачи, которые готовы и ждут операции ввода/вывода.

Идея очень проста. Есть цикл обработки событий. И у нас есть функции, которые выполняют асинхронные операции ввода-вывода. Мы передаем свои функции циклу событий и просим его запустить их. Цикл событий возвращает нам объект Future, словно обещание, что в будущем мы что-то получим. Мы держимся за обещание, время от времени проверяем, имеет ли оно значение, и, наконец, когда значение получено, мы используем его в некоторых других операциях.

Как использовать asyncio?

Прежде чем мы начнём, давайте взглянем на пример:

import asyncio
import datetime
import random


async def my_sleep_func():
    await asyncio.sleep(random.randint(0, 5))


async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await my_sleep_func()


loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()

Обратите внимание, что синтаксис async/await предназначен только для Python 3.5 и выше. Пройдёмся по коду:

  • У нас есть асинхронная функция display_date, которая принимает число (в качестве идентификатора) и цикл обработки событий в качестве параметров.
  • Функция имеет бесконечный цикл, который прерывается через 50 секунд. Но за этот период она неоднократно печатает время и делает паузу. Функция await может ожидать завершения выполнения других асинхронных функций (корутин).
  • Передаем функцию в цикл обработки событий (используя метод ensure_future).
  • Запускаем цикл событий.

Всякий раз, когда происходит вызов await, asyncio понимает, что функции, вероятно, потребуется некоторое время. Таким образом, он приостанавливает выполнение, начинает мониторинг любого связанного с ним события ввода-вывода и позволяет запускать задачи. Когда asyncio замечает, что приостановленный ввод-вывод функции готов, он возобновляет функцию.

Делаем правильный выбор

Только что мы прошлись по самым популярным формам конкурентности. Но остаётся вопрос — что следует выбрать?

Это зависит от вариантов использования. Из моего опыта я склонен следовать этому псевдо-коду:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
       print("Use Threads")
else:
    print("Multi Processing")

Такие сложные материи, как асинхронность, мы проходим на обучении Рython 👇

Рекомендуем курс по теме

Укр Рус