Как настроить SQLAlchemy, SQLModel и Alembic для асинхронной работы с FastAPI

Как настроить SQLAlchemy, SQLModel и Alembic для асинхронной работы с FastAPI

  • 17 июня, 2022
  • читать 20 мин

Настройка проекта

Начните с клонирования базового проекта из репозитория fastapi-sqlmodel-alembic:

$ git clone -b base https://github.com/testdrivenio/fastapi-sqlmodel-alembic
$ cd fastapi-sqlmodel-alembic

В корневом каталоге проекта создайте образы и разверните контейнеры Docker:

$ docker-compose up -d --build

Как только сборка будет завершена, перейдите к http://localhost:8004/ping.

Вы должны увидеть:

{
  "ping": "pong!"
}

Прежде, чем двигаться дальше, посмотрите на структуру проекта.

SQLModel

Далее, давайте добавим библиотеку SQLModel для взаимодействия с SQL-базами данных с помощью объектов Python. По сути, это оболочка поверх pydantic и SQLAlchemy, основанная на аннотациях типов Python.

Нам также понадобится Psycopg.

Добавьте две зависимости в project/requirements.txt:

fastapi==0.68.1
psycopg2-binary==2.9.1
sqlmodel==0.0.4
uvicorn==0.15.0

Создайте два новых файла в разделе "проект/приложение", db.py и models.py.

project/app/models.py:

from sqlmodel import SQLModel, Field


class SongBase(SQLModel):
    name: str
    artist: str


class Song(SongBase, table=True):
    id: int = Field(default=None, primary_key=True)


class SongCreate(SongBase):
    pass

Здесь мы определили три модели:

  1. SongBase — это базовая модель, от которой наследуются другие. У неё есть два свойства — name и artist, оба из которых являются строками. Это модель только для данных, так как в ней нет table=True, а это значит, что она используется только в качестве модели pydantic.
  2. Song добавляет id в базовую модель. Это табличная модель, так что это модель pydantic и SQLAlchemy. Она представляет собой таблицу базы данных.
  3. SongCreate — это модель pydantic, основанная только на данных, которая будет использоваться для создания новых экземпляров песен.

project/app/db.py:

import os

from sqlmodel import create_engine, SQLModel, Session


DATABASE_URL = os.environ.get("DATABASE_URL")

engine = create_engine(DATABASE_URL, echo=True)


def init_db():
    SQLModel.metadata.create_all(engine)


def get_session():
    with Session(engine) as session:
        yield session

Здесь мы:

  1. Инициализировал новый движок SQLAlchemy с использованием create_engine из SQLModel. Основные различия между версией create_engine из SQLModel и версией SQLAlchemy заключаются в том, что версия SQLModel добавляет аннотации типов (для поддержки редактора) и включает стиль движков и соединений SQLAlchemy "2.0". Кроме того, мы передаём echo=True чтобы мы могли видеть сгенерированные SQL-запросы в терминале. Это всегда приятно включать в режиме отладки.
  2. Создали сеанс SQLAlchemy.

Далее внутри project/app/main.py создадим таблицы при событии startup:

from fastapi import FastAPI

from app.db import init_db
from app.models import Song

app = FastAPI()


@app.on_event("startup")
def on_startup():
    init_db()


@app.get("/ping")
async def pong():
    return {"ping": "pong!"}

Стоит отметить, что строчка from app.models import Song обязательна. Без неё таблица песен не будет создана.

Чтобы протестировать, удалите старые контейнеры и тома, соберите заново образы и разверните новые контейнеры:

$ docker-compose down -v
$ docker-compose up -d --build

Откройте логи контейнеров через docker-compose logs web.

Вы должны увидеть:

web_1  | CREATE TABLE song (
web_1  |    name VARCHAR NOT NULL,
web_1  |    artist VARCHAR NOT NULL,
web_1  |    id SERIAL,
web_1  |    PRIMARY KEY (id)
web_1  | )

Откройте psql:

$ docker-compose exec db psql --username=postgres --dbname=foo

psql (13.4 (Debian 13.4-1.pgdg100+1))
Type "help" for help.

foo=# \dt

        List of relations
 Schema | Name | Type  |  Owner
--------+------+-------+----------
 public | song | table | postgres
(1 row)

foo=# \q

Теперь, когда таблица создана, давайте добавим несколько новых маршрутов в project/app/main.py:

from fastapi import Depends, FastAPI
from sqlalchemy import select
from sqlmodel import Session

from app.db import get_session, init_db
from app.models import Song, SongCreate

app = FastAPI()


@app.on_event("startup")
def on_startup():
    init_db()


@app.get("/ping")
async def pong():
    return {"ping": "pong!"}


@app.get("/songs", response_model=list[Song])
def get_songs(session: Session = Depends(get_session)):
    result = session.execute(select(Song))
    songs = result.scalars().all()
    return [Song(name=song.name, artist=song.artist, id=song.id) for song in songs]


@app.post("/songs")
def add_song(song: SongCreate, session: Session = Depends(get_session)):
    song = Song(name=song.name, artist=song.artist)
    session.add(song)
    session.commit()
    session.refresh(song)
    return song

Добавьте песню:

$ curl -d '{"name":"Midnight Fit", "artist":"Mogwai"}' -H "Content-Type: application/json" -X POST http://localhost:8004/songs

{
  "id": 1,
  "name": "Midnight Fit",
  "artist": "Mogwai"
}

В браузере перейдите к http://localhost:8004/songs.

Вы должны увидеть:

{
  "id": 1,
  "name": "Midnight Fit",
  "artist": "Mogwai"
}

Асинхронная модель SQLModel

Двигаясь дальше, давайте добавим поддержку асинхронности в SQLModel.

Во-первых, остановите контейнеры и тома:

$ docker-compose down -v

Обновите URI базы данных в docker-compose.yml, добавив:

+asyncpg:
environment:
  - DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/foo

Затем замените Psycopg на asyncpg:

asyncpg==0.24.0
fastapi==0.68.1
sqlmodel==0.0.4
uvicorn==0.15.0

Обновление project/app/db.py: чтобы использовать асинхронность в SQLAlchemy engine и session:

import os

from sqlmodel import SQLModel

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker


DATABASE_URL = os.environ.get("DATABASE_URL")

engine = create_async_engine(DATABASE_URL, echo=True, future=True)


async def init_db():
    async with engine.begin() as conn:
        # await conn.run_sync(SQLModel.metadata.drop_all)
        await conn.run_sync(SQLModel.metadata.create_all)


async def get_session() -> AsyncSession:
    async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
    async with async_session() as session:
        yield session

Примечания:

  1. Мы использовали конструкции SQLAlchemy, например, create_async_engine и AsyncSession, поскольку на момент написания SQLModel для них не было обёрток.
  2. Мы отключили expire on commit передав expire_on_commit=False.
  3. metadata.create_all не выполняется асинхронно, поэтому мы использовали run_sync для его синхронного выполнения в асинхронной функции.

Превратите on_startup в асинхронную функцию в project/app/main.py:

@app.on_event("startup")
async def on_startup():
    await init_db()

Вот и все. Пересоберите образы и запустите контейнеры:

$ docker-compose up -d --build

Убедитесь, что таблицы были созданы.

Наконец, обновите обработчики маршрутов в project/app/main.py, чтобы использовать асинхронное выполнение:

from fastapi import Depends, FastAPI
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db import get_session, init_db
from app.models import Song, SongCreate

app = FastAPI()


@app.on_event("startup")
async def on_startup():
    await init_db()


@app.get("/ping")
async def pong():
    return {"ping": "pong!"}


@app.get("/songs", response_model=list[Song])
async def get_songs(session: AsyncSession = Depends(get_session)):
    result = await session.execute(select(Song))
    songs = result.scalars().all()
    return [Song(name=song.name, artist=song.artist, id=song.id) for song in songs]


@app.post("/songs")
async def add_song(song: SongCreate, session: AsyncSession = Depends(get_session)):
    song = Song(name=song.name, artist=song.artist)
    session.add(song)
    await session.commit()
    await session.refresh(song)
    return song

Добавьте новую песню и убедитесь, что http://localhost:8004/songs работает так, как и ожидалось.

Alembic

Наконец, давайте добавим Alembic, чтобы правильно обрабатывать изменения схемы базы данных.

Добавьте его в файл требований:

alembic==1.7.1
asyncpg==0.24.0
fastapi==0.68.1
sqlmodel==0.0.4
uvicorn==0.15.0

Удалите событие запуска из project/app/main.py, так как мы больше не хотим, чтобы таблицы создавались при запуске:

@app.on_event("startup")
async def on_startup():
    await init_db()

Опять же, остановите существующие контейнеры и тома:

$ docker-compose down -v

Перезапустите контейнеры:

$ docker-compose up -d --build

Взгляните на использование Asyncio с Alembic, пока создаются новые образы.

После запуска контейнеров инициализируйте Alembic с помощью асинхронного шаблона:

$ docker-compose exec web alembic init -t async migrations

В созданной папке "проект/миграции" импортируйте SQLModel в script.py.mako, файл шаблона Mako:

"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel             # NEW
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}


def upgrade():
    ${upgrades if upgrades else "pass"}


def downgrade():
    ${downgrades if downgrades else "pass"}

Теперь, когда будет создан новый файл миграции, он будет включать import sqlmodel.

Далее, нам нужно обновить верхнюю часть project/migrations/env.py вот так:

import asyncio
from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlmodel import SQLModel                       # NEW

from alembic import context

from app.models import Song                         # NEW

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = SQLModel.metadata             # UPDATED

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

...

Здесь мы импортировали SQLModel и нашу модель песни. Затем мы устанавливаем в target_metadata метаданные нашей модели, SQLModel.metadata. Для получения дополнительной информации об аргументе target_metadata, ознакомьтесь с автоматической генерацией миграций из официальных документов Alembic.

Обновите sqlalchemy.url в проекте/alembic.ini:

sqlalchemy.url = postgresql+asyncpg://postgres:postgres@db:5432/foo

Чтобы создать первый файл миграции, выполните:

$ docker-compose exec web alembic revision --autogenerate -m "init"

Если все прошло хорошо, вы должны увидеть новый файл миграции в разделе "project/migrations/versions", который выглядит примерно так:

"""init

Revision ID: f9c634db477d
Revises:
Create Date: 2021-09-10 00:24:32.718895

"""
from alembic import op
import sqlalchemy as sa
import sqlmodel


# revision identifiers, used by Alembic.
revision = 'f9c634db477d'
down_revision = None
branch_labels = None
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('song',
    sa.Column('name', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
    sa.Column('artist', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
    sa.Column('id', sa.Integer(), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_song_artist'), 'song', ['artist'], unique=False)
    op.create_index(op.f('ix_song_id'), 'song', ['id'], unique=False)
    op.create_index(op.f('ix_song_name'), 'song', ['name'], unique=False)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_song_name'), table_name='song')
    op.drop_index(op.f('ix_song_id'), table_name='song')
    op.drop_index(op.f('ix_song_artist'), table_name='song')
    op.drop_table('song')
    # ### end Alembic commands ###

Примените миграцию:

$ docker-compose exec web alembic upgrade head

Убедитесь, что вы можете добавить песню.

Давайте быстро протестируем изменение схемы. Обновите модель SongBase в project/app/models.py:

class SongBase(SQLModel):
    name: str
    artist: str
    year: Optional[int] = None

Не забывайте об импорте:

from typing import Optional

Создайте новый файл миграции:

$ docker-compose exec web alembic revision --autogenerate -m "add year"

Обновите функции upgrade и downgrade из автоматически сгенерированного файла миграции следующим образом:

def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('song', sa.Column('year', sa.Integer(), nullable=True))
    op.create_index(op.f('ix_song_year'), 'song', ['year'], unique=False)
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_song_year'), table_name='song')
    op.drop_column('song', 'year')
    # ### end Alembic commands ###

Примените миграцию:

$ docker-compose exec web alembic upgrade head

Обновите обработчики маршрутов:

@app.get("/songs", response_model=list[Song])
async def get_songs(session: AsyncSession = Depends(get_session)):
    result = await session.execute(select(Song))
    songs = result.scalars().all()
    return [Song(name=song.name, artist=song.artist, year=song.year, id=song.id) for song in songs]


@app.post("/songs")
async def add_song(song: SongCreate, session: AsyncSession = Depends(get_session)):
    song = Song(name=song.name, artist=song.artist, year=song.year)
    session.add(song)
    await session.commit()
    await session.refresh(song)
    return song

Тест:

$ curl -d '{"name":"Midnight Fit", "artist":"Mogwai", "year":"2021"}' -H "Content-Type: application/json" -X POST http://localhost:8004/songs

Вывод

В этом руководстве мы рассмотрели, как настроить SQLAlchemy, SQLModel и Alembic для асинхронной работы с FastAPI.