Як налаштувати SQLAlchemy, SQLModel і Alembic для асинхронної роботи з FastAPI

  • 1547
  • 3
  • 17 червня, 2022
  • читати 20 хв

Зміст

Налаштування проєкту

Почніть з клонування базового проекту з репозиторію fastapi-sqlmodel-alembic:
$ git clone -b base https://github.com/testdrivenio/fastapi-sqlmodel-alembic
$ cd fastapi-sqlmodel-alembic

У кореневому каталозі проєкту створіть образи та розгорніть контейнери Docker:

$ docker-compose up -d --build

Як тільки сбирання буде завершено, перейдіть до http://localhost:8004/ping.

Ви повинні побачити:

{
  "ping": "pong!"
}

Перш ніж рухатися далі, подивіться на структуру проєкту.

SQLModel

Далі, давайте додамо бібліотеку SQLModel для взаємодії з SQL базами даних за допомогою об'єктів Python. По суті, це оболонка поверх pydantic і SQLAlchemy, заснована на інструкціях типів Python.

Нам також знадобиться Psycopg.

Додайте дві залежності в project/requirements.txt:

fastapi==0.68.1
psycopg2-binary==2.9.1
sqlmodel==0.0.4
uvicorn==0.15.0

Створіть два нових файли в розділі "проект/додаток", db.py та models.py.

project/app/models.py:

from sqlmodel import SQLModel, Field


class SongBase(SQLModel):
    name: str
    artist: str


class Song(SongBase, table=True):
    id: int = Field(default=None, primary_key=True)


class SongCreate(SongBase):
    pass

Тут ми визначили три моделі:

  1. SongBase — це базова модель, від якої успадковуються інші. Вона має дві властивості — name і artist, обидві з яких є рядками. Це модель тільки для даних, тому що в ній немає table=True, а це означає, що вона використовується тільки як модель pydantic.
  2. Song додає id до базової моделі. Це таблична модель, так що це модель pydantic і SQLAlchemy. Вона є таблицю бази даних.
  3. SongCreate — це модель pydantic, заснована лише на даних, яка використовуватиметься для створення нових екземплярів пісень.

project/app/db.py:

import os

from sqlmodel import create_engine, SQLModel, Session


DATABASE_URL = os.environ.get("DATABASE_URL")

engine = create_engine(DATABASE_URL, echo=True)


def init_db():
    SQLModel.metadata.create_all(engine)


def get_session():
    with Session(engine) as session:
        yield session

Тут ми:

  1. Ініціалізували новий движок SQLAlchemy з використанням create_engine із SQLModel. Основні відмінності між версією create_engine з SQLModel та версією SQLAlchemy полягають у тому, що версія SQLModel додає анотації типів (для підтримки редактора) та включає стиль движків та з'єднань SQLAlchemy "2.0". Крім того, ми передаємо echo=True, щоб ми могли бачити згенеровані SQL-запити в терміналі. Це завжди приємно включати як налагодження.
  2. Створили сеанс SQLAlchemy.

Далі всередині project/app/main.py створимо таблиці при події startup:

from fastapi import FastAPI

from app.db import init_db
from app.models import Song

app = FastAPI()


@app.on_event("startup")
def on_startup():
    init_db()


@app.get("/ping")
async def pong():
    return {"ping": "pong!"}

Рядок від app.models import Song обов'язковий. Без неї таблицю пісень не буде створено.

Щоб протестувати, видаліть старі контейнери та томи, заново зберіть образи та розгорніть нові контейнери:

$ docker-compose down -v
$ docker-compose up -d --build

Відкрийте логотипи контейнерів через docker-compose logs web.

Ви повинні побачити:

web_1  | CREATE TABLE song (
web_1  |    name VARCHAR NOT NULL,
web_1  |    artist VARCHAR NOT NULL,
web_1  |    id SERIAL,
web_1  |    PRIMARY KEY (id)
web_1  | )

Відкрийте psql:

$ docker-compose exec db psql --username=postgres --dbname=foo

psql (13.4 (Debian 13.4-1.pgdg100+1))
Type "help" for help.

foo=# \dt

        List of relations
 Schema | Name | Type  |  Owner
--------+------+-------+----------
 public | song | table | postgres
(1 row)

foo=# \q

Тепер, коли таблицю створено, давайте додамо кілька нових маршрутів у project/app/main.py:

from fastapi import Depends, FastAPI
from sqlalchemy import select
from sqlmodel import Session

from app.db import get_session, init_db
from app.models import Song, SongCreate

app = FastAPI()


@app.on_event("startup")
def on_startup():
    init_db()


@app.get("/ping")
async def pong():
    return {"ping": "pong!"}


@app.get("/songs", response_model=list[Song])
def get_songs(session: Session = Depends(get_session)):
    result = session.execute(select(Song))
    songs = result.scalars().all()
    return [Song(name=song.name, artist=song.artist, id=song.id) for song in songs]


@app.post("/songs")
def add_song(song: SongCreate, session: Session = Depends(get_session)):
    song = Song(name=song.name, artist=song.artist)
    session.add(song)
    session.commit()
    session.refresh(song)
    return song

Додайте пісню:

$ curl -d '{"name":"Midnight Fit", "artist":"Mogwai"}' -H "Content-Type: application/json" -X POST http://localhost:8004/songs

{
  "id": 1,
  "name": "Midnight Fit",
  "artist": "Mogwai"
}

В браузері перейдіть до http://localhost:8004/songs.

Ви повинні побачити:

{
  "id": 1,
  "name": "Midnight Fit",
  "artist": "Mogwai"
}

Асинхронна модель SQLModel

Рухаючи далі, давайте додамо підтримку асинхронності в SQLModel.

По-перше, зупиніть контейнери та томи:

$ docker-compose down -v

Оновіть URI бази даних у docker-compose.yml, додавши:

+asyncpg:
environment:
  - DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/foo

Потім замініть Psycopg на asyncpg:

asyncpg==0.24.0
fastapi==0.68.1
sqlmodel==0.0.4
uvicorn==0.15.0

Оновлення project/app/db.py: для того, щоб використовувати асинхронність у SQLAlchemy engine та session:

import os

from sqlmodel import SQLModel

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker


DATABASE_URL = os.environ.get("DATABASE_URL")

engine = create_async_engine(DATABASE_URL, echo=True, future=True)


async def init_db():
    async with engine.begin() as conn:
        # await conn.run_sync(SQLModel.metadata.drop_all)
        await conn.run_sync(SQLModel.metadata.create_all)


async def get_session() -> AsyncSession:
    async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
    async with async_session() as session:
        yield session

Примітки:

  1. Ми використовували конструкції SQLAlchemy, наприклад create_async_engine і AsyncSession, оскільки на момент написання SQLModel для них не було обгорток.
  2. Ми відключили expire on commit, передавши expire_on_commit=False.
  3. metadata.create_all не виконується асинхронно, тому ми використовували run_sync для синхронного виконання в асинхронній функції.

Перетворіть on_startup на асинхронну функцію в project/app/main.py:

@app.on_event("startup")
async def on_startup():
    await init_db()

От і все. Перезберіть образи та запустіть контейнери:

$ docker-compose up -d --build

Переконайтеся, що таблиці створено.

Нарешті, оновіть обробники маршрутів у project/app/main.py, щоб використовувати асинхронне виконання:

from fastapi import Depends, FastAPI
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db import get_session, init_db
from app.models import Song, SongCreate

app = FastAPI()


@app.on_event("startup")
async def on_startup():
    await init_db()


@app.get("/ping")
async def pong():
    return {"ping": "pong!"}


@app.get("/songs", response_model=list[Song])
async def get_songs(session: AsyncSession = Depends(get_session)):
    result = await session.execute(select(Song))
    songs = result.scalars().all()
    return [Song(name=song.name, artist=song.artist, id=song.id) for song in songs]


@app.post("/songs")
async def add_song(song: SongCreate, session: AsyncSession = Depends(get_session)):
    song = Song(name=song.name, artist=song.artist)
    session.add(song)
    await session.commit()
    await session.refresh(song)
    return song

Додайте нову пісню та переконайтеся, що http://localhost:8004/songs працює так, як і очікувалося.

Alembic

Нарешті, давайте додамо Alembic, щоб правильно опрацьовувати зміни схеми бази даних.

Додайте його до файлу вимог:

alembic==1.7.1
asyncpg==0.24.0
fastapi==0.68.1
sqlmodel==0.0.4
uvicorn==0.15.0

Видаліть подію запуску з project/app/main.py, тому що ми не хочемо, щоб таблиці створювалися при запуску:

@app.on_event("startup")
async def on_startup():
    await init_db()

Знову ж таки, зупиніть існуючі контейнери та томи:

$ docker-compose down -v

Перезапустіть контейнери:

$ docker-compose up -d --build

Погляньте на використання Asyncio з Alembic, доки створюються нові образи.

Після запуску контейнерів ініціалізуйте Alembic за допомогою асинхронного шаблону:

$ docker-compose exec web alembic init -t async migrations

У створеній папці "проект/міграції" імпортуйте SQLModel у script.py.mako, файл шаблону Mako:

"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel             # NEW
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}


def upgrade():
    ${upgrades if upgrades else "pass"}


def downgrade():
    ${downgrades if downgrades else "pass"}

Тепер, коли буде створено новий файл міграції, він включатиме import sqlmodel.

Далі нам потрібно оновити верхню частину project/migrations/env.py ось так:

import asyncio
from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlmodel import SQLModel                       # NEW

from alembic import context

from app.models import Song                         # NEW

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = SQLModel.metadata             # UPDATED

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

...

Тут ми імпортували SQLModel та нашу модель пісні. Потім ми встановлюємо в target_metadata метадані нашої моделі SQLModel.metadata. Для отримання додаткової інформації про аргумент target_metadata ознайомтеся з автоматичною генерацією міграцій з офіційних документів Alembic.

Оновіть sqlalchemy.url у проекті/alembic.ini:

sqlalchemy.url = postgresql+asyncpg://postgres:postgres@db:5432/foo

Щоб створити перший файл міграції, виконайте:

$ docker-compose exec web alembic revision --autogenerate -m "init"

Якщо все пройшло добре, ви повинні побачити новий файл міграції у розділі "project/migrations/versions", який виглядає приблизно так:

"""init

Revision ID: f9c634db477d
Revises:
Create Date: 2021-09-10 00:24:32.718895

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel


# revision identifiers, used by Alembic.
revision = 'f9c634db477d'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('song',
    sa.Column('name', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
    sa.Column('artist', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
    sa.Column('id', sa.Integer(), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_song_artist'), 'song', ['artist'], unique=False)
    op.create_index(op.f('ix_song_id'), 'song', ['id'], unique=False)
    op.create_index(op.f('ix_song_name'), 'song', ['name'], unique=False)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_song_name'), table_name='song')
    op.drop_index(op.f('ix_song_id'), table_name='song')
    op.drop_index(op.f('ix_song_artist'), table_name='song')
    op.drop_table('song')
    # ### end Alembic commands ###

Застосуйте міграцію:

$ docker-compose exec web alembic upgrade head

Переконайтеся, що ви можете додати пісню.

Давайте швидко протестуємо зміну схеми. Оновіть модель SongBase у project/app/models.py:

class SongBase(SQLModel):
    name: str
    artist: str
    year: Optional[int] = None

Не забувайте про імпорт:

from typing import Optional

Створіть новий файл міграції:

$ docker-compose exec web alembic revision --autogenerate -m "add year"

Обновіть функції upgrade та downgrade з автоматично згенерованого міграційного файлу наступним чином:

def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('song', sa.Column('year', sa.Integer(), nullable=True))
    op.create_index(op.f('ix_song_year'), 'song', ['year'], unique=False)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_song_year'), table_name='song')
    op.drop_column('song', 'year')
    # ### end Alembic commands ###

Застосуйте міграцію:

$ docker-compose exec web alembic upgrade head

Оновіть обробники маршрутів:

@app.get("/songs", response_model=list[Song])
async def get_songs(session: AsyncSession = Depends(get_session)):
    result = await session.execute(select(Song))
    songs = result.scalars().all()
    return [Song(name=song.name, artist=song.artist, year=song.year, id=song.id) for song in songs]


@app.post("/songs")
async def add_song(song: SongCreate, session: AsyncSession = Depends(get_session)):
    song = Song(name=song.name, artist=song.artist, year=song.year)
    session.add(song)
    await session.commit()
    await session.refresh(song)
    return song

Тест:

$ curl -d '{"name":"Midnight Fit", "artist":"Mogwai", "year":"2021"}' -H "Content-Type: application/json" -X POST http://localhost:8004/songs

Висновок

У цьому посібнику ми розглянули, як настроїти SQLAlchemy, SQLModel і Alembic для асинхронної роботи з FastAPI.