Асинхронний Python: різні форми конкурентності

Асинхронний Python: різні форми конкурентності

  • 31 травня, 2022
  • читати 20 хв

Це переклад статті Абу Ашраф Маснуна «Async Python: The Different Forms of Concurrency»

З появою Python 3 досить багато шуму про «асинхронність» і «паралелізм», можна подумати, що Python нещодавно представив ці можливості/концепції. Але це не так. Ми багато разів використовували ці операції. Крім того, новачки можуть подумати, що asyncio є єдиним або найкращим способом відтворити та використовувати асинхронні/паралельні операції. У цій статті ми розглянемо різні способи досягнення паралелізму, їх переваги та недоліки.

Визначення термінів:

Перш ніж ми заглибимося в технічні аспекти, важливо мати деяке базове розуміння термінів, які часто використовуються в цьому контексті.

Синхронний та асинхронний:

У синхронних операціях завдання виконуються одне за одним. У асинхронних — задачі можуть запускатися і завершуватися незалежно один від одного. Одна асинхронна задача може запускатися і продовжувати виконуватись, поки виконання переходить до нового завдання. Асинхронні завдання не блокують (не змушують чекати завершення виконання завдання) операції та зазвичай виконуються у фоновому режимі.

Наприклад, ви повинні звернутися до туристичного агентства, щоб спланувати свою наступну відпустку. Вам потрібно надіслати листа своєму керівнику, перш ніж відлетіти. У синхронному режимі ви спочатку зателефонуєте в туристичне агентство, і якщо вас попросять почекати, то ви чекатимете, поки вам не дадуть відповіді. Потім ви почнете писати листа керівнику. Таким чином, ви виконуєте завдання послідовно, одне за одним.

Але, якщо ви розумні, то поки вас попросили почекати, ви почнете писати листа, і коли з вами знову заговорять, ви припините написання, поговорите, а потім допишете листа. Ви також можете попросити друга зателефонувати до агенції, а самі написати листа. Це асинхронність, завдання не блокують одне одного.

Конкурентність та паралелізм:

Конкурентність передбачає, що завдання виконуються спільно. У попередньому прикладі, коли ми розглядали асинхронний приклад, ми поступово просувалися то в написанні листи, то в розмові з турагентством. Це конкурентність.

Коли ми попросили зателефонувати другу, а самі писали листа, то завдання виконувались паралельно.

Паралелізм, по суті, є формою конкурентності. Але паралелізм залежить від обладнання. Наприклад, якщо в CPU лише одне ядро, то дві задачі не можуть виконуватися паралельно. Вони просто ділять процесорний час між собою. Тоді це конкурентність, але не паралелізм. Але коли ми маємо кілька ядер, ми можемо виконувати кілька операцій (залежно від кількості ядер) одночасно.

Підсумуємо:

  • Синхронність: блокує операції (блокуючі)
  • Асинхронність: не блокує операції (неблокуючі)
  • Конкурентність: спільний прогрес (спільні)
  • Паралелізм: паралельний прогрес (паралельні)

Паралелізм має на увазі конкурентність. Але конкурентність не завжди має на увазі паралелізм.

Потоки та процеси

Python підтримує потоки вже давно. Потоки дозволяють виконувати операції конкурентно. Але є проблема, пов'язана з Global Interpreter Lock (GIL), через яку потоки не могли забезпечити справжній паралелізм. І тим не менш, з появою multiprocessing можна використовувати кілька ядер за допомогою Python.

Потоки (Threads)

Розглянемо маленький приклад. У наведеному нижче коді функція worker буде виконуватися в декількох потоках асинхронно і одночасно.

import threading
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.start()

print("All Threads are queued, let's see when they finish!")

А ось приклад вихідних даних:

$ python thread_test.py
All Threads are queued, let's see when they finish!
I am Worker 1, I slept for 1 seconds
I am Worker 3, I slept for 4 seconds
I am Worker 4, I slept for 5 seconds
I am Worker 2, I slept for 7 seconds
I am Worker 0, I slept for 9 seconds

Таким чином ми запустили 5 потоків для спільної роботи, і після їх старту (тобто після запуску функції worker) операція не чекає завершення роботи потоків, перш ніж перейти до наступного оператора print. Це асинхронна операція.

У нашому прикладі ми передали функцію конструктор Thread. Якби ми хотіли, то могли б реалізувати підклас із методом (ООП стиль).

Global Interpreter Lock (GIL)

GIL потрібний, щоб зробити обробку пам'яті CPython простіше та забезпечити найкращу інтеграцію з C.

GIL це механізм блокування, коли інтерпретатор Python запускає в роботу лише один потік за раз. Це означає, що один потік може виконуватися в байт-коді Python одночасно. GIL стежить, щоб кілька потоків не виконувались паралельно.

Коротка інформація про GIL:

  • Одночасно може виконуватись один потік.
  • Інтерпретатор Python перемикається між потоками для досягнення конкурентності.
  • GIL застосовується до CPython (стандартної реалізації). Але, наприклад, Jython та IronPython не мають GIL.
  • GIL робить однопотокові програми швидкими.
  • Операціям введення/виведення GIL зазвичай не заважає.
  • GIL дозволяє легко інтегрувати непотокобезпечні бібліотеки на C, завдяки GIL ми маємо багато високопродуктивних розширень/модулів, написаних на C.
  • Для CPU-залежних завдань інтерпретатор робить перевірку кожні N тиків та перемикає потоки. Таким чином, один потік не блокує інші.

Багато хто бачать у GIL слабкість. Я ж розглядаю це як благо, адже було створено такі бібліотеки, як NumPy, SciPy, які займають особливе, унікальне становище у науковому суспільстві.

Процеси (Processes)

Щоб досягти паралелізму, в Python був доданий модуль multiprocessing, який надає API і виглядає дуже схожим, якщо ви використовували threading раніше.

Давайте просто підемо і змінимо попередній приклад. Тепер модифікована версія використовує процес замість потоку.

import multiprocessing
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = multiprocessing.Process(target=worker, args=(i,))
    t.start()

print("All Processes are queued, let's see when they finish!")

Що змінилося? Я просто імпортував модуль multiprocessing замість threading. А потім замість потоку я використав процес. От і все! Тепер замість багатьох потоків ми використовуємо процеси, які запускаються на різних ядрах CPU (якщо, звичайно, у вашого процесора кілька ядер).

За допомогою класу Pool ми можемо розподілити виконання однієї функції між декількома процесами для різних вхідних значень.

Приклад із офіційних документів:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

Тут замість того, щоб перебирати список значень та викликати функцію f по одному, ми фактично запускаємо функцію у різних процесах.

Один процес виконує f(1), інший-f(2), а інший-f(3). Нарешті результати знову об'єднуються в список. Це дозволяє нам розбити важкі обчислення на дрібніші частини та запускати їх паралельно для швидшого розрахунку.

Модуль concurrent.futures

Модуль concurrent.futures є великий і дозволяє писати асинхронний код дуже легко. Мої улюбленці — ThreadPoolExecutor та ProcessPoolExecutor. Ці виконавці підтримують пул потоків чи процесів. Ми відправляємо наші завдання в пул, і він запускає завдання у доступному потоці/процесі. Повертається об'єкт Future, який можна використовувати для запиту та отримання результату після завершення завдання.

А ось приклад ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())

Asyncio — що, як і чому

У вас, мабуть, є питання, яке є у багатьох людей в спільноті Python — що asyncio приносить нового? Навіщо потрібен був ще один спосіб асинхронного введення-виводу? Хіба в нас уже не було потоків та процесів?

Навіщо нам потрібний asyncio?

Процеси дуже дорогі та вимагають багато ресурсів для створення. Тому для операцій введення/виводу переважно вибираються потоки.

Ми знаємо, що введення-виведення залежить від зовнішніх речей — повільні диски або неприємні мережеві лаги роблять введення-виведення часто непередбачуваним. Тепер припустимо, що ми використовуємо потоки для операцій вводу-виводу. 3 потоки виконують різні завдання введення-виводу. Інтерпретатор мав би перемикатися між конкурентними потоками і давати кожному їх деякий час по черзі.

Назвемо потоки — T1, T2 та T3. Три потоки розпочали свою операцію введення-виводу. T3 завершує його першим. T2 та T1 досі очікують введення-висновку. Інтерпретатор Python перемикається на T1, але він все ще чекає. Добре, інтерпретатор переміщається в T2, а той все ще чекає, а потім переміщається до T3, який готовий та виконує код. Ви бачите у цьому проблему?

T3 був готовий, але інтерпретатор спочатку переключився між T2 і T1 — це понесло витрати на перемикання, яких ми могли б уникнути, якби інтерпретатор спочатку переключився на T3, чи не так?

Що таке asynio?

Asyncio надає нам цикл подій поряд з іншими крутими речами. Цикл подій (event loop) відстежує події введення/виводу та перемикає завдання, які готові та чекають на операції введення/виводу.

Ідея дуже проста. Є цикл обробки подій. І ми маємо функції, які виконують асинхронні операції введення-виводу. Ми передаємо свої функції циклу подій та просимо його запустити їх. Цикл подій повертає нам об'єкт Future, наче обіцянка, що у майбутньому ми щось отримаємо. Ми тримаємося за обіцянку, іноді перевіряємо, чи має вона значення, і, нарешті, коли значення отримано, ми використовуємо його у деяких інших операціях.

Як використовувати Asyncio?

Перш ніж ми почнемо, погляньмо на приклад:

import asyncio
import datetime
import random


async def my_sleep_func():
    await asyncio.sleep(random.randint(0, 5))


async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await my_sleep_func()


loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()

Зверніть увагу, що синтаксис async/await призначений лише для Python 3.5 та вище. Пройдемося за кодом:

  • У нас є асинхронна функція display_date, яка приймає число (як ідентифікатор) та цикл обробки подій як параметри.
  • Функція має нескінченний цикл, який переривається за 50 секунд. Але за цей період вона неодноразово друкує час та робить паузу. Функція await може очікувати завершення виконання інших асинхронних функцій (корутин).
  • Передаємо функцію цикл обробки подій (використовуючи метод ensure_future).
  • Запускаємо цикл подій.

Щоразу, коли відбувається виклик await, asyncio розуміє, що функції, ймовірно, знадобиться деякий час. Таким чином, він зупиняє виконання, починає моніторинг будь-якої пов'язаної з ним події введення-виводу та дозволяє запускати завдання. Коли asyncio помічає, що призупинене введення-виведення функції готове, він відновлює функцію.

Робимо правильний вибір

Щойно ми пройшлися найпопулярнішими формами конкурентності. Але залишається питання — що слід обрати?

Це залежить від варіантів використання. З мого досвіду я схильний слідувати цьому псевдо-коду:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
       print("Use Threads")
else:
    print("Multi Processing")

Такі складні матерії, як асинхронність, ми проходимо на навчанні Python 👇

Рекомендуємо курс по темі