Skip to main content

On This Page

SQLAlchemy 2.0 in Production - Full Guide

31 min read
Share

Introduction

This document describes how SQLAlchemy 2.0 operates in production systems. It assumes you understand Python, SQL, relational database design, and concurrency primitives. This is not a tutorial.

The example throughout is a multi-tenant document management SaaS with these entities: Organization, User, Folder, Document, DocumentVersion. We’ll implement tenant isolation, handle schema boundaries, and show where SQLAlchemy helps and where it doesn’t.


Installation and Setup

Core Installation

pip install sqlalchemy==2.0.x  # Latest 2.0 release

Database Driver Installation

SQLAlchemy requires a DBAPI driver for each database. Install only what you use in production.

PostgreSQL:

# psycopg2 (stable, widely used, C extension)
pip install psycopg2-binary

# Or build from source (recommended for production)
pip install psycopg2

# psycopg3 / psycopg (modern, async support)
pip install psycopg[binary]  # With C speedups
pip install psycopg         # Pure Python fallback

# asyncpg (async-only, high performance for asyncio)
pip install asyncpg

MySQL:

# mysqlclient (C extension, fastest, requires libmysqlclient)
pip install mysqlclient

# PyMySQL (pure Python, no dependencies)
pip install pymysql

# mysql-connector-python (official MySQL driver)
pip install mysql-connector-python

SQLite:

# Built into Python standard library, no installation needed
# Just use: engine = create_engine("sqlite:////path/to/database.db")

Async Support

For async applications, install additional dependencies:

# For async PostgreSQL
pip install sqlalchemy[asyncio]   # Optional dependencies for async
pip install asyncpg               # Required for async PostgreSQL

# For async MySQL (limited support)
pip install aiomysql

Development and Migration Tools

# Alembic for database migrations (required in production)
pip install alembic

# Type hints support
pip install sqlalchemy[mypy]

# Development/testing
pip install pytest pytest-asyncio

Typical Production Setup

# PostgreSQL + Alembic minimal setup
pip install sqlalchemy psycopg[binary] alembic

# Complete multi-database setup
pip install sqlalchemy psycopg[binary] pymysql alembic

Verify Installation

import sqlalchemy
print(f"SQLAlchemy version: {sqlalchemy.__version__}")

# Test engine creation (no connection yet)
from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg://user:password@localhost/test", echo=False)
print(f"Engine created: {engine}")

Dependency Summary

ComponentForInstallation
SQLAlchemyORM/Corepip install sqlalchemy
psycopgPostgreSQL syncpip install psycopg[binary]
psycopg2PostgreSQL sync (legacy)pip install psycopg2-binary
asyncpgPostgreSQL asyncpip install asyncpg
mysqlclientMySQL (fast)pip install mysqlclient
PyMySQLMySQL (pure Python)pip install pymysql
AlembicDatabase migrationspip install alembic

1. SQLAlchemy Architecture

Core vs ORM

Core provides:

  • SQL expression construction
  • Connection pooling
  • Transaction management
  • Type system
  • Schema introspection

ORM adds:

  • Object-relational mapping
  • Identity map (per-session object cache)
  • Unit of work (automatic INSERT/UPDATE/DELETE batching)
  • Lazy/eager loading strategies
  • Relationship traversal

Capability boundaries:

  • Core cannot track object state or relationships.
  • ORM cannot express every SQL construct (window functions with complex partitioning, recursive CTEs with multiple anchors).
  • Mixing both is normal and recommended.

Engine, Connection, Session

Engine:

  • Owns connection pool
  • Thread-safe
  • Create once per database, reuse globally
  • Does not hold connections open
from sqlalchemy import create_engine

# Create once at module level
engine = create_engine(
    "postgresql+psycopg2://user:pass@host/dbname",
    pool_size=10,          # Max persistent connections in pool
    max_overflow=20,       # Additional connections beyond pool_size when needed, dropped after usage (Use carefully).
    pool_pre_ping=True,    # Test connection validity before using (prevents stale connections)
    echo=False,            # Don't log SQL statements (set True for debugging)
)

Connection:

  • Represents a single DBAPI connection
  • Not thread-safe
  • Acquired from engine pool
  • Must be explicitly closed or used as context manager
# Explicit connection management
with engine.connect() as conn:  # Checkout connection from pool
    result = conn.execute(text("SELECT 1"))
    conn.commit()  # Explicit commit required in SQLAlchemy 2.0
# Connection automatically returned to pool on exit

Session:

  • ORM’s unit of work
  • Wraps a connection
  • Not thread-safe
  • Tracks object state (identity map)
  • Batches changes, flushes on commit or explicit flush()
from sqlalchemy.orm import sessionmaker, Session

SessionLocal = sessionmaker(
    bind=engine,              # Engine to use for connections
    expire_on_commit=False    # Don't expire objects after commit (optional)
)

# Per-request or per-task
def process_request():
    session = SessionLocal()  # Create new session instance
    try:
        # work
        session.commit()      # Flush changes and commit transaction
    except:
        session.rollback()    # Undo changes on error
        raise
    finally:
        session.close()       # Release connection back to pool

Thread safety:

  • Engine: safe to share
  • Connection: never share across threads
  • Session: never share across threads or async tasks

Identity Map and Unit of Work

Identity map:

  • Session-local cache keyed by (class, primary_key)
  • Same database row = same Python object within a session
  • Prevents duplicate SELECT queries for already-loaded objects
  • Cleared on session.close() or session.expunge_all()
session = SessionLocal()
user1 = session.get(User, 123)  # Fetch from database, store in identity map
user2 = session.get(User, 123)  # Return cached object from identity map (no SQL)
assert user1 is user2  # True - same object instance

Unit of work:

  • Session tracks: new, dirty, deleted
  • Flush batches all changes into SQL
  • Commit flushes then commits transaction
  • Rollback clears pending changes and reverts transaction

When identity map causes issues:

  • Long-lived sessions see stale data
  • High-concurrency writes cause serialization conflicts
  • Memory bloat if loading thousands of objects

Solution: Short-lived sessions, explicit expunge, or use Core for bulk reads.


2. Database and Schema Setup

Creating Database Programmatically

SQLAlchemy does not create databases. You must do this externally or via raw SQL.

from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError

def create_database_if_not_exists(admin_url: str, db_name: str):
    """
    admin_url: connection to 'postgres' or 'mysql' system database
    db_name: target database name
    """
    engine = create_engine(
        admin_url, 
        isolation_level="AUTOCOMMIT"  # Required for CREATE DATABASE (can't run in transaction)
    )
    with engine.connect() as conn:
        # Check existence
        result = conn.execute(
            text("SELECT 1 FROM pg_database WHERE datname = :db"),
            {"db": db_name}  # Safe parameter binding
        )
        if not result.fetchone():  # Database doesn't exist
            # PostgreSQL does not allow parameterized CREATE DATABASE
            conn.execute(text(f"CREATE DATABASE {db_name}"))  # Must use string interpolation
    engine.dispose()  # Close all connections

# Usage
create_database_if_not_exists(
    "postgresql+psycopg2://admin:pass@localhost/postgres",
    "docmanager"
)

For MySQL:

def create_mysql_database(admin_url: str, db_name: str):
    engine = create_engine(admin_url)
    with engine.connect() as conn:
        conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {db_name}"))
        conn.commit()
    engine.dispose()

Dedicated Schemas

PostgreSQL supports schemas (namespaces within a database). MySQL does not (database = schema).

from sqlalchemy import MetaData, Table, Column, Integer, String

# Create schemas
def setup_schemas(engine):
    with engine.connect() as conn:
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS app"))
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS auth"))
        conn.commit()

# Map models to schema
metadata_app = MetaData(schema="app")
metadata_auth = MetaData(schema="auth")

class User(Base):
    __tablename__ = "users"
    __table_args__ = {"schema": "auth"}
    
    id = Column(Integer, primary_key=True)
    email = Column(String(255), nullable=False, unique=True)

class Document(Base):
    __tablename__ = "documents"
    __table_args__ = {"schema": "app"}
    
    id = Column(Integer, primary_key=True)
    title = Column(String(500), nullable=False)

Foreign keys across schemas:

from sqlalchemy import ForeignKey

class Document(Base):
    __tablename__ = "documents"
    __table_args__ = {"schema": "app"}
    
    id = Column(Integer, primary_key=True)
    owner_id = Column(Integer, ForeignKey("auth.users.id"), nullable=False)

Search path:

Default search path is "$user", public. If you use a custom schema, either:

  1. Fully qualify all table names (schema.table)
  2. Set search path per connection:
from sqlalchemy import event

@event.listens_for(engine, "connect")
def set_search_path(dbapi_conn, connection_record):
    cursor = dbapi_conn.cursor()
    cursor.execute("SET search_path TO app, auth, public")
    cursor.close()

3. Engine Configuration

Connection URLs

SQLite:

# In-memory
engine = create_engine("sqlite:///:memory:")

# File-based
engine = create_engine("sqlite:///./dev.db")

# Enable foreign keys (disabled by default)
from sqlalchemy import event

@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
    cursor = dbapi_conn.cursor()
    cursor.execute("PRAGMA foreign_keys=ON")
    cursor.close()

PostgreSQL:

# psycopg2 (sync)
engine = create_engine(
    "postgresql+psycopg2://user:password@host:5432/database"
)

# psycopg (async)
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
    "postgresql+asyncpg://user:password@host:5432/database"
)

MySQL:

# mysqlclient
engine = create_engine(
    "mysql+mysqldb://user:password@host:3306/database?charset=utf8mb4"
)

# PyMySQL
engine = create_engine(
    "mysql+pymysql://user:password@host:3306/database?charset=utf8mb4"
)

Pooling Configuration

engine = create_engine(
    url,
    pool_size=10,           # Core pool of persistent connections
    max_overflow=20,        # Extra connections allowed when pool exhausted (total max = 30)
    pool_recycle=3600,      # Close and replace connections after 1 hour
    pool_pre_ping=True,     # Test connection with SELECT 1 before use (adds 1-2ms)
    pool_timeout=30,        # Seconds to wait for connection before raising TimeoutError
)

pool_pre_ping:

  • Adds 1-2ms per checkout
  • Prevents errors when database closes idle connections
  • Required if wait_timeout < pool_recycle

pool_recycle:

  • Must be less than database’s connection timeout
  • PostgreSQL: default unlimited, but cloud providers enforce limits
  • MySQL: default 8 hours (wait_timeout)

pool_size + max_overflow:

  • Total max connections = pool_size + max_overflow
  • Set based on max_connections on database server
  • Leave headroom for admin connections and other services

Echo and Isolation Levels

# Log all SQL (dev only)
engine = create_engine(url, echo=True)

# Set isolation level
engine = create_engine(
    url,
    isolation_level="REPEATABLE READ"  # or READ COMMITTED, SERIALIZABLE
)

# Per-connection isolation
with engine.connect().execution_options(
    isolation_level="SERIALIZABLE"
) as conn:
    # work
    pass

Autocommit:

SQLAlchemy 2.0 removed autocommit mode. All operations are transactional by default.

# Explicit commit required
with engine.connect() as conn:
    conn.execute(text("INSERT INTO logs (msg) VALUES ('event')"))
    conn.commit()

Multiple Engines (Read/Write Split)

write_engine = create_engine("postgresql://primary/db")
read_engine = create_engine("postgresql://replica/db")

# Route queries explicitly
def get_user_readonly(user_id: int):
    with Session(bind=read_engine) as session:
        return session.get(User, user_id)

def update_user(user_id: int, email: str):
    with Session(bind=write_engine) as session:
        user = session.get(User, user_id)
        user.email = email
        session.commit()

Routing via session bind:

from sqlalchemy.orm import Session

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None):
        if self._flushing:
            return write_engine
        else:
            return read_engine

Limitations:

  • ORM relationships span binds awkwardly
  • Replication lag causes read-after-write inconsistencies
  • Better handled at infrastructure level (HAProxy, PgBouncer)

4. Declarative ORM (Production Patterns)

Declarative Base and Naming Conventions

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import MetaData, Integer, String, Text, DateTime, ForeignKey
from datetime import datetime

# Naming convention for constraints (critical for Alembic)
convention = {
    "ix": "ix_%(column_0_label)s",                                       # Index names
    "uq": "uq_%(table_name)s_%(column_0_name)s",                         # Unique constraint names
    "ck": "ck_%(table_name)s_%(constraint_name)s",                       # Check constraint names
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", # Foreign key names
    "pk": "pk_%(table_name)s"                                            # Primary key names
}

class Base(DeclarativeBase):
    metadata = MetaData(naming_convention=convention)  # Apply conventions to all models

Models with Explicit Configuration

from typing import Optional
from sqlalchemy.orm import relationship

class Organization(Base):
    __tablename__ = "organizations"              # Explicit table name
    __table_args__ = {"schema": "app"}           # PostgreSQL schema (namespace)
    
    id: Mapped[int] = mapped_column(primary_key=True)  # Auto-increment by default
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),              # Store timezone-aware timestamps
        nullable=False,
        default=datetime.utcnow              # Set at insert time (Python-side default)
    )
    
    # Relationships (not columns in database, Python-only)
    users: Mapped[list["User"]] = relationship(back_populates="organization")
    folders: Mapped[list["Folder"]] = relationship(back_populates="organization")

class User(Base):
    __tablename__ = "users"
    __table_args__ = (
        {"schema": "auth"}  # Different schema from Organization
    )
    
    id: Mapped[int] = mapped_column(primary_key=True)
    organization_id: Mapped[int] = mapped_column(
        ForeignKey("app.organizations.id", ondelete="CASCADE"),  # FK across schemas
        nullable=False,
        index=True  # Index foreign keys for join performance
    )
    email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
    full_name: Mapped[str] = mapped_column(String(255), nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
    
    # Relationships enable traversal: user.organization, user.documents
    organization: Mapped["Organization"] = relationship(back_populates="users")
    documents: Mapped[list["Document"]] = relationship(back_populates="owner")

class Folder(Base):
    __tablename__ = "folders"
    __table_args__ = (
        {"schema": "app"}
    )
    
    id: Mapped[int] = mapped_column(primary_key=True)
    organization_id: Mapped[int] = mapped_column(
        ForeignKey("app.organizations.id", ondelete="CASCADE"),  # Delete cascade from org
        nullable=False,
        index=True
    )
    parent_id: Mapped[Optional[int]] = mapped_column(
        ForeignKey("app.folders.id", ondelete="CASCADE"),  # Self-referential FK
        nullable=True,  # NULL for root folders
        index=True
    )
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    
    organization: Mapped["Organization"] = relationship(back_populates="folders")
    parent: Mapped[Optional["Folder"]] = relationship(
        remote_side=[id],  # Specify "one" side of one-to-many (which column is remote)
        back_populates="children"
    )
    children: Mapped[list["Folder"]] = relationship(back_populates="parent")  # One-to-many
    documents: Mapped[list["Document"]] = relationship(back_populates="folder")

class Document(Base):
    __tablename__ = "documents"
    __table_args__ = (
        {"schema": "app"}
    )
    
    id: Mapped[int] = mapped_column(primary_key=True)
    organization_id: Mapped[int] = mapped_column(
        ForeignKey("app.organizations.id", ondelete="CASCADE"),  # Tenant isolation column
        nullable=False,
        index=True  # Critical for tenant-filtered queries
    )
    folder_id: Mapped[int] = mapped_column(
        ForeignKey("app.folders.id", ondelete="CASCADE"),
        nullable=False,
        index=True
    )
    owner_id: Mapped[int] = mapped_column(
        ForeignKey("auth.users.id", ondelete="RESTRICT"),  # Prevent deletion if user owns docs
        nullable=False,
        index=True
    )
    title: Mapped[str] = mapped_column(String(500), nullable=False)
    content: Mapped[Optional[str]] = mapped_column(Text, nullable=True)  # Large text field
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        nullable=False,
        onupdate=datetime.utcnow  # Auto-update timestamp on UPDATE (Python-side)
    )
    
    organization: Mapped["Organization"] = relationship()
    folder: Mapped["Folder"] = relationship(back_populates="documents")
    owner: Mapped["User"] = relationship(back_populates="documents")
    versions: Mapped[list["DocumentVersion"]] = relationship(
        back_populates="document",
        order_by="DocumentVersion.version_number.desc()"
    )

class DocumentVersion(Base):
    __tablename__ = "document_versions"
    __table_args__ = (
        {"schema": "app"}
    )
    
    id: Mapped[int] = mapped_column(primary_key=True)
    document_id: Mapped[int] = mapped_column(
        ForeignKey("app.documents.id", ondelete="CASCADE"),
        nullable=False,
        index=True
    )
    version_number: Mapped[int] = mapped_column(Integer, nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
    
    document: Mapped["Document"] = relationship(back_populates="versions")

Indexes and Unique Constraints

from sqlalchemy import Index, UniqueConstraint

class Document(Base):
    __tablename__ = "documents"
    __table_args__ = (
        # Composite index for queries like: WHERE org_id=X AND folder_id=Y
        Index("ix_documents_org_folder", "organization_id", "folder_id"),
        # Prevent duplicate version numbers within same document
        UniqueConstraint("document_id", "version_number", name="uq_document_version"),
        # Partial index (PostgreSQL only) - smaller, faster for filtered queries
        Index(
            "ix_documents_active_title",
            "title",
            postgresql_where=text("deleted_at IS NULL")  # Only index non-deleted docs
        ),
        {"schema": "app"}
    )

Why explicit naming:

  • Alembic generates consistent migration diffs
  • Database error messages reference known constraint names
  • Avoids auto-generated names like documents_organization_id_folder_id_key_1

5. Table Creation and Lifecycle

metadata.create_all()

# Create all tables
Base.metadata.create_all(bind=engine)

# Create specific tables
Base.metadata.create_all(bind=engine, tables=[User.__table__, Document.__table__])

# Drop all tables
Base.metadata.drop_all(bind=engine)

When acceptable:

  • Local development setup
  • Ephemeral test databases in CI
  • Initial schema bootstrap in greenfield projects

When forbidden:

  • Production deployments
  • Any environment with existing data
  • Schema changes (use migrations)

Controlled Table Creation in CI

# tests/conftest.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

@pytest.fixture(scope="session")
def engine():
    engine = create_engine("postgresql://test:test@localhost/test_db")
    Base.metadata.create_all(bind=engine)
    yield engine
    Base.metadata.drop_all(bind=engine)
    engine.dispose()

@pytest.fixture
def session(engine):
    connection = engine.connect()          # Get connection
    transaction = connection.begin()        # Start transaction
    session = sessionmaker(bind=connection)()  # Create session using this connection
    
    yield session  # Test runs here
    
    session.close()         # Close session
    transaction.rollback()  # Rollback all test changes
    connection.close()      # Return connection to pool

Why Migrations Are Mandatory in Production

Problems with create_all():

  • No versioning or rollback
  • Cannot handle data migrations
  • Drops and recreates indexes unnecessarily
  • Loses data on schema changes
  • No audit trail

Use Alembic or equivalent. See section 10.


6. Sessions and Transactions

Session Scope

Per-request (web applications):

from contextlib import contextmanager

@contextmanager
def get_session():
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.close()

# FastAPI example
from fastapi import Depends

def get_db():
    db = SessionLocal()  # Create session for this request
    try:
        yield db  # Inject into route handler
    finally:
        db.close()  # Always close after request

@app.post("/documents")
def create_document(doc: DocCreate, db: Session = Depends(get_db)):
    document = Document(**doc.dict())  # Create model instance
    db.add(document)                   # Stage for insert
    db.commit()                        # Execute INSERT, commit transaction
    db.refresh(document)               # Reload from DB (get auto-generated ID, defaults)
    return document

Per-job (background workers):

def process_document_job(document_id: int):
    with get_session() as session:
        document = session.get(Document, document_id)
        # process
        session.commit()

Unit of work (manual control):

def transfer_documents(source_folder_id: int, target_folder_id: int):
    session = SessionLocal()
    try:
        documents = session.query(Document).filter_by(folder_id=source_folder_id).all()
        for doc in documents:
            doc.folder_id = target_folder_id
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.close()

Explicit Transaction Boundaries

from sqlalchemy.orm import Session

def update_document_with_version(session: Session, doc_id: int, new_content: str):
    """
    Transaction boundary defined by caller.
    """
    document = session.get(Document, doc_id)
    if not document:
        raise ValueError("Document not found")
    
    # Create new version
    latest = session.query(DocumentVersion).filter_by(
        document_id=doc_id
    ).order_by(DocumentVersion.version_number.desc()).first()
    
    next_version = (latest.version_number + 1) if latest else 1
    
    version = DocumentVersion(
        document_id=doc_id,
        version_number=next_version,
        content=new_content,
        created_at=datetime.utcnow()
    )
    session.add(version)
    
    document.content = new_content
    document.updated_at = datetime.utcnow()
    
    # Caller commits or rolls back

# Usage
with get_session() as session:
    update_document_with_version(session, 123, "new content")
    # implicit commit on exit

Nested Transactions (Savepoints)

from sqlalchemy.orm import Session

def create_folder_with_documents(session: Session, org_id: int, folder_name: str, doc_titles: list[str]):
    # Outer transaction started by caller
    folder = Folder(organization_id=org_id, name=folder_name)
    session.add(folder)  # Stage folder for insert
    session.flush()      # Execute INSERT immediately, get folder.id without committing
    
    for title in doc_titles:
        try:
            # Nested transaction (SAVEPOINT in SQL)
            with session.begin_nested():  # Creates SAVEPOINT
                doc = Document(
                    organization_id=org_id,
                    folder_id=folder.id,  # Now available from flush
                    owner_id=1,  # placeholder
                    title=title,
                    created_at=datetime.utcnow(),
                    updated_at=datetime.utcnow()
                )
                session.add(doc)
        except Exception as e:
            # ROLLBACK TO SAVEPOINT - folder insert still valid
            print(f"Failed to create document {title}: {e}")
            continue
    
    # Outer transaction commits all successful documents

# Usage
with get_session() as session:
    create_folder_with_documents(session, 1, "Reports", ["Q1", "Q2", "invalid!!!"])

Rollback Semantics

session = SessionLocal()
user = session.get(User, 1)
user.email = "[email protected]"

session.rollback()  # Changes discarded

print(user.email)  # Old value, object reverted

Object state after rollback:

  • Attributes reset to last committed state
  • Pending INSERTs removed from session
  • Objects remain in session (identity map)

Full reset:

session.rollback()
session.expunge_all()  # Clear identity map

Patterns That Prevent Session Leaks

Incorrect:

# Session never closed if exception raised
session = SessionLocal()
user = session.get(User, 1)
user.email = "[email protected]"
session.commit()
session.close()  # Not reached if commit() fails

Correct:

session = SessionLocal()
try:
    user = session.get(User, 1)
    user.email = "[email protected]"
    session.commit()
finally:
    session.close()

Best:

with SessionLocal() as session:
    user = session.get(User, 1)
    user.email = "[email protected]"
    session.commit()
# Auto-closed on exit

Async:

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

async_session = async_sessionmaker(async_engine, expire_on_commit=False)

async with async_session() as session:
    async with session.begin():
        user = await session.get(User, 1)
        user.email = "[email protected]"
    # Auto-commit and close

7. Querying (ORM)

Deterministic Querying Patterns

SQLAlchemy 2.0 uses select() construct for all ORM queries.

from sqlalchemy import select, and_, or_, func

# Get by primary key (checks identity map first, then SELECT)
session.get(User, 123)

# Simple filter
stmt = select(User).where(User.email == "[email protected]")  # Build SELECT statement
user = session.execute(stmt).scalar_one_or_none()  # Execute, return single object or None

# Multiple conditions (AND)
stmt = select(Document).where(
    and_(  # Explicit AND (can also chain .where() calls)
        Document.organization_id == org_id,
        Document.folder_id == folder_id,
        Document.created_at >= start_date
    )
)
documents = session.execute(stmt).scalars().all()  # scalars() unwraps tuples, .all() fetches list

# OR conditions
stmt = select(User).where(
    or_(
        User.email == email,
        User.full_name.ilike(f"%{query}%")
    )
)
users = session.execute(stmt).scalars().all()

# Ordering and limiting
stmt = select(Document).where(
    Document.organization_id == org_id
).order_by(Document.created_at.desc()).limit(10)
recent_docs = session.execute(stmt).scalars().all()

# Aggregation
stmt = select(func.count(Document.id)).where(Document.organization_id == org_id)
count = session.execute(stmt).scalar()

Joins and Subqueries

# Explicit join
stmt = select(Document, User).join(User, Document.owner_id == User.id).where(
    Document.organization_id == org_id
)
results = session.execute(stmt).all()
for doc, user in results:
    print(doc.title, user.full_name)

# Join via relationship
stmt = select(Document).join(Document.owner).where(User.email.like("%@example.com"))
docs = session.execute(stmt).scalars().all()

# Subquery for counts
from sqlalchemy import select, func

subq = select(
    Document.folder_id,
    func.count(Document.id).label("doc_count")
).where(
    Document.organization_id == org_id
).group_by(Document.folder_id).subquery()

stmt = select(Folder, subq.c.doc_count).outerjoin(
    subq, Folder.id == subq.c.folder_id
).where(Folder.organization_id == org_id)

results = session.execute(stmt).all()
for folder, count in results:
    print(folder.name, count or 0)

Relationship Loading Strategies

Lazy loading (default):

document = session.get(Document, 1)  # SELECT documents WHERE id=1
print(document.owner.email)  # Triggers separate SELECT users WHERE id=document.owner_id

Eager loading (joinedload):

from sqlalchemy.orm import joinedload

stmt = select(Document).options(
    joinedload(Document.owner)  # Use LEFT OUTER JOIN to load owner in same query
).where(Document.id == 1)
document = session.execute(stmt).scalar_one()
print(document.owner.email)  # Already loaded, no additional SELECT

Subquery load:

from sqlalchemy.orm import subqueryload

stmt = select(Folder).options(
    subqueryload(Folder.documents).subqueryload(Document.versions)
).where(Folder.organization_id == org_id)
folders = session.execute(stmt).scalars().all()

for folder in folders:
    for doc in folder.documents:
        print(doc.versions)  # Already loaded

Select in load:

from sqlalchemy.orm import selectinload

stmt = select(Organization).options(
    selectinload(Organization.folders)           # Separate SELECT for all folders (WHERE org_id IN (...))
    .selectinload(Folder.documents)              # Then SELECT for all documents (WHERE folder_id IN (...))
).where(Organization.id == 1)
org = session.execute(stmt).scalar_one()  # 3 queries total, regardless of collection size

for folder in org.folders:
    for doc in folder.documents:
        print(doc.title)  # No N+1, all data pre-loaded

Avoiding N+1 Issues

Problem:

folders = session.execute(select(Folder).where(Folder.organization_id == org_id)).scalars().all()
for folder in folders:
    print(folder.documents)  # N queries

Solution 1 - selectinload:

stmt = select(Folder).options(selectinload(Folder.documents)).where(
    Folder.organization_id == org_id
)
folders = session.execute(stmt).scalars().all()
for folder in folders:
    print(folder.documents)  # 2 queries total

Solution 2 - manual join:

stmt = select(Folder, Document).outerjoin(Document).where(Folder.organization_id == org_id)
results = session.execute(stmt).all()

folders_dict = {}
for folder, doc in results:
    if folder.id not in folders_dict:
        folders_dict[folder.id] = {"folder": folder, "documents": []}
    if doc:
        folders_dict[folder.id]["documents"].append(doc)

Bulk Operations and Caveats

Bulk insert (bypasses unit of work):

session.bulk_insert_mappings(
    Document,  # Model class
    [  # List of dictionaries (not model instances)
        {"organization_id": 1, "folder_id": 1, "owner_id": 1, "title": f"Doc {i}",
         "created_at": datetime.utcnow(), "updated_at": datetime.utcnow()}
        for i in range(1000)
    ]
)
session.commit()  # Executes bulk INSERT (much faster than 1000 add() calls)

Bulk update:

session.bulk_update_mappings(
    Document,
    [
        {"id": 1, "title": "Updated 1"},
        {"id": 2, "title": "Updated 2"},
    ]
)
session.commit()

Caveats:

  • No validation or default values
  • No relationship handling
  • No ORM events fired
  • Objects not added to session
  • Faster but less safe

Batch update (ORM-aware):

from sqlalchemy import update

stmt = update(Document).where(
    Document.folder_id == old_folder_id  # Filter which rows to update
).values(folder_id=new_folder_id)        # Set new value

session.execute(stmt)  # Executes: UPDATE documents SET folder_id=X WHERE folder_id=Y
session.commit()       # Commits transaction

When ORM Queries Become a Liability

Complex aggregations:

# ORM: awkward and slow
from sqlalchemy import func, case

stmt = select(
    Folder.name,
    func.count(Document.id).label("total_docs"),
    func.sum(case((Document.content.isnot(None), 1), else_=0)).label("with_content")
).join(Document).group_by(Folder.id, Folder.name).where(
    Folder.organization_id == org_id
)

# Better: raw SQL (see section 8)

Large scans:

# ORM: loads all rows into memory
stmt = select(Document).where(Document.organization_id == org_id)
documents = session.execute(stmt).scalars().all()  # 100k objects

# Better: Core with streaming or pagination

Use Core or raw SQL when:

  • Aggregating thousands of rows
  • Returning denormalized reports
  • Write-heavy operations with no read-back
  • Query complexity exceeds ORM expressiveness

8. Raw SQL Execution

Executing Raw SQL via text()

from sqlalchemy import text

# Simple query
result = session.execute(text("SELECT id, title FROM app.documents WHERE organization_id = 1"))
for row in result:
    print(row.id, row.title)

# With connection (no session)
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM app.documents"))
    count = result.scalar()
    conn.commit()  # Required even for SELECT if in transaction

Binding Parameters Safely

Never use string formatting:

# WRONG - SQL injection vulnerability
org_id = request.args.get("org_id")  # User input could be "1 OR 1=1"
stmt = text(f"SELECT * FROM app.documents WHERE organization_id = {org_id}")

Correct - bound parameters:

stmt = text("SELECT * FROM app.documents WHERE organization_id = :org_id")  # Named parameter
result = session.execute(stmt, {"org_id": org_id})  # Safely bound, SQL injection protected

Multiple parameters:

stmt = text("""
    SELECT * FROM app.documents 
    WHERE organization_id = :org_id 
    AND created_at >= :start_date 
    AND title ILIKE :title_pattern
""")
result = session.execute(stmt, {
    "org_id": org_id,
    "start_date": start_date,
    "title_pattern": f"%{query}%"
})

Returning Scalar vs Row Mappings

Fetch all rows:

result = session.execute(text("SELECT id, title FROM app.documents"))
rows = result.fetchall()  # List of Row objects
for row in rows:
    print(row.id, row.title)  # Access by column name or index: row[0], row['id']

Fetch single row:

result = session.execute(text("SELECT id, title FROM app.documents WHERE id = :id"), {"id": 1})
row = result.fetchone()  # Single Row object or None
if row:
    print(row.id, row.title)

Scalar (single value):

count = session.execute(text("SELECT COUNT(*) FROM app.documents")).scalar()  # Return first column of first row

Map to dictionary:

result = session.execute(text("SELECT id, title, created_at FROM app.documents"))
docs = [{"id": row.id, "title": row.title, "created_at": row.created_at} for row in result]

Mixing ORM-Managed Entities with Raw SQL

Load ORM objects from raw SQL:

stmt = text("SELECT * FROM app.documents WHERE organization_id = :org_id")
result = session.execute(stmt, {"org_id": org_id})

# Cannot directly map to ORM objects from text()
# Must use ORM query or load manually

# Alternative: use select() with literal SQL
from sqlalchemy import literal_column

stmt = select(Document).where(
    literal_column("organization_id") == org_id
)
documents = session.execute(stmt).scalars().all()

Execute raw SQL, then refresh ORM objects:

session.execute(text("UPDATE app.documents SET title = :title WHERE id = :id"), {
    "id": doc_id,
    "title": new_title
})
session.commit()

# Refresh ORM object
document = session.get(Document, doc_id)
session.refresh(document)
print(document.title)  # Reflects database state

Using Raw SQL for Performance-Critical Paths

Example: Bulk reporting query

def get_organization_stats(org_id: int) -> dict:
    query = text("""
        SELECT 
            (SELECT COUNT(*) FROM app.documents WHERE organization_id = :org_id) as total_docs,
            (SELECT COUNT(*) FROM app.folders WHERE organization_id = :org_id) as total_folders,
            (SELECT COUNT(*) FROM auth.users WHERE organization_id = :org_id) as total_users,
            (SELECT MAX(created_at) FROM app.documents WHERE organization_id = :org_id) as latest_doc_date
    """)
    
    with engine.connect() as conn:
        result = conn.execute(query, {"org_id": org_id})
        row = result.fetchone()
        return {
            "total_docs": row.total_docs,
            "total_folders": row.total_folders,
            "total_users": row.total_users,
            "latest_doc_date": row.latest_doc_date
        }

Example: Complex join with window functions

def get_latest_document_per_folder(org_id: int):
    query = text("""
        WITH ranked_docs AS (
            SELECT 
                d.id, d.folder_id, d.title, d.created_at,
                ROW_NUMBER() OVER (PARTITION BY d.folder_id ORDER BY d.created_at DESC) as rn
            FROM app.documents d
            WHERE d.organization_id = :org_id
        )
        SELECT rd.folder_id, rd.title, rd.created_at, f.name as folder_name
        FROM ranked_docs rd
        JOIN app.folders f ON rd.folder_id = f.id
        WHERE rd.rn = 1
    """)
    
    result = session.execute(query, {"org_id": org_id})
    return [
        {
            "folder_id": row.folder_id,
            "folder_name": row.folder_name,
            "document_title": row.title,
            "created_at": row.created_at
        }
        for row in result
    ]

9. Multi-Tenancy Strategies

A. Shared Database, Shared Schema

All tenants share the same tables. Tenant isolation enforced by organization_id column.

Advantages:

  • Simple schema management
  • Easy to query across tenants (admin/analytics)
  • Single migration path

Disadvantages:

  • Risk of data leakage if query filters are missed
  • Cannot easily set per-tenant resource limits
  • Index bloat if tenant sizes vary significantly

Implementation:

# Every query must filter by organization_id
def get_documents(session: Session, org_id: int):
    stmt = select(Document).where(Document.organization_id == org_id)
    return session.execute(stmt).scalars().all()

# Incorrect - missing tenant filter
def get_document_by_id(session: Session, doc_id: int):
    return session.get(Document, doc_id)  # DANGER: No tenant check

# Correct
def get_document_by_id(session: Session, org_id: int, doc_id: int):
    stmt = select(Document).where(
        and_(Document.id == doc_id, Document.organization_id == org_id)
    )
    return session.execute(stmt).scalar_one_or_none()

Session-level filters (automatic tenant scoping):

from sqlalchemy import event
from sqlalchemy.orm import Session

class TenantSession(Session):
    def __init__(self, organization_id: int, **kwargs):
        super().__init__(**kwargs)
        self.organization_id = organization_id
    
    def execute(self, statement, *args, **kwargs):
        # Intercept and inject tenant filter (complex, brittle)
        # Not recommended for production
        return super().execute(statement, *args, **kwargs)

# Better: Use explicit helper
def with_tenant_filter(stmt, org_id: int, model):
    return stmt.where(model.organization_id == org_id)

# Usage
stmt = select(Document)
stmt = with_tenant_filter(stmt, org_id, Document)
documents = session.execute(stmt).scalars().all()

Query helper pattern:

from typing import Type, TypeVar
from sqlalchemy import Select

T = TypeVar("T")

def tenant_query(session: Session, model: Type[T], org_id: int) -> Select[tuple[T]]:
    """Returns a select statement pre-filtered by organization_id."""
    return select(model).where(model.organization_id == org_id)  # Always inject tenant filter

# Usage
def get_all_documents(session: Session, org_id: int):
    stmt = tenant_query(session, Document, org_id)  # Get pre-filtered statement
    return session.execute(stmt).scalars().all()    # Execute: SELECT * FROM documents WHERE org_id=X

def get_folder_with_documents(session: Session, org_id: int, folder_id: int):
    stmt = tenant_query(session, Folder, org_id).where(
        Folder.id == folder_id
    ).options(selectinload(Folder.documents))
    return session.execute(stmt).scalar_one_or_none()

Risks and failure modes:

  1. Forgotten filter:
# Developer forgets to add org_id filter
stmt = select(Document).where(Document.title.ilike(f"%{query}%"))
documents = session.execute(stmt).scalars().all()  # Returns all tenants' docs

Mitigation:

  • Code review checklist
  • Automated linting (detect queries without tenant filter)
  • Integration tests with multiple tenants
  1. Joins across tenants:
# Incorrect: joins user from different org
stmt = select(Document).join(User).where(Document.id == doc_id)
doc = session.execute(stmt).scalar_one()  # User might be from different org

# Correct: verify both sides
stmt = select(Document).join(User).where(
    and_(
        Document.id == doc_id,
        Document.organization_id == org_id,
        User.organization_id == org_id
    )
)

B. Shared Database, Separate Schemas

Each tenant gets a dedicated PostgreSQL schema. Tables have identical structure across schemas.

Advantages:

  • Strong isolation (no query can accidentally cross tenants)
  • Tenant-level backups/restores
  • Per-tenant indexing strategies

Disadvantages:

  • Complex migration management (must apply to all schemas)
  • Cross-tenant queries require explicit UNION or schema-qualified joins
  • Schema count limits (PostgreSQL handles thousands, but management overhead)

Implementation:

1. Dynamic schema binding per request:

from sqlalchemy import event, text
from sqlalchemy.orm import Session

# Store tenant schema in session.info (session-local storage)
class TenantSession(Session):
    def __init__(self, tenant_schema: str, **kwargs):
        super().__init__(**kwargs)
        self.info["tenant_schema"] = tenant_schema  # Store schema name for this session

# Set search_path on connection checkout (executed when transaction begins)
@event.listens_for(SessionLocal, "after_begin")
def set_tenant_schema(session, transaction, connection):
    tenant_schema = session.info.get("tenant_schema")  # Retrieve stored schema
    if tenant_schema:
        connection.execute(text(f"SET search_path TO {tenant_schema}, public"))  # Set PostgreSQL search path

# Usage
def get_tenant_session(org_id: int) -> Session:
    tenant_schema = f"tenant_{org_id}"  # e.g., tenant_1, tenant_2
    return TenantSession(tenant_schema=tenant_schema, bind=engine)

# Request handler
def handle_request(org_id: int):
    session = get_tenant_session(org_id)
    try:
        # All queries automatically use tenant schema
        documents = session.execute(select(Document)).scalars().all()
        session.commit()
    finally:
        session.close()

2. Creating tenant schemas programmatically:

def create_tenant_schema(org_id: int):
    schema_name = f"tenant_{org_id}"  # e.g., tenant_42
    
    with engine.connect() as conn:
        # Create PostgreSQL schema (namespace)
        conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
        conn.commit()
    
    # Create tables in tenant schema
    tenant_metadata = MetaData(schema=schema_name)  # Target this schema
    
    # Redefine tables with tenant schema (must mirror production models)
    Table(
        "documents", tenant_metadata,
        Column("id", Integer, primary_key=True),
        Column("title", String(500), nullable=False),
        # ... all columns (must match exactly)
    )
    Table("folders", tenant_metadata, ...)
    # etc.
    
    tenant_metadata.create_all(bind=engine)  # CREATE TABLE tenant_42.documents ...

# Bootstrap new tenant
def onboard_organization(org_id: int, name: str):
    # Create schema and tables
    create_tenant_schema(org_id)
    
    # Insert initial data
    session = get_tenant_session(org_id)
    try:
        root_folder = Folder(name="Root", parent_id=None)
        session.add(root_folder)
        session.commit()
    finally:
        session.close()

3. Migration complexity:

Each tenant schema must be migrated independently.

# Alembic migration runner
def migrate_all_tenants():
    org_ids = get_all_organization_ids()
    
    for org_id in org_ids:
        schema_name = f"tenant_{org_id}"
        print(f"Migrating {schema_name}...")
        
        # Run Alembic with target schema
        # Requires Alembic config to dynamically set schema
        alembic_cfg = Config("alembic.ini")
        alembic_cfg.set_main_option("sqlalchemy.url", str(engine.url))
        alembic_cfg.set_section_option("alembic", "schema", schema_name)
        
        command.upgrade(alembic_cfg, "head")

4. Cross-tenant queries (admin/reporting):

def get_document_count_all_tenants():
    org_ids = get_all_organization_ids()
    results = []
    
    for org_id in org_ids:
        session = get_tenant_session(org_id)
        try:
            count = session.execute(
                select(func.count(Document.id))
            ).scalar()
            results.append({"org_id": org_id, "doc_count": count})
        finally:
            session.close()
    
    return results

# Alternatively: raw SQL with schema-qualified names
def get_document_count_all_tenants_raw():
    org_ids = get_all_organization_ids()
    
    union_parts = [
        f"SELECT {org_id} as org_id, COUNT(*) as doc_count FROM tenant_{org_id}.documents"
        for org_id in org_ids
    ]
    query = " UNION ALL ".join(union_parts)
    
    with engine.connect() as conn:
        result = conn.execute(text(query))
        return [{"org_id": row.org_id, "doc_count": row.doc_count} for row in result]

Risks and failure modes:

  1. Schema not set correctly:
session = SessionLocal()  # Forgot to use get_tenant_session
documents = session.execute(select(Document)).scalars().all()  # Uses public or default schema

Mitigation:

  • Always use factory function (get_tenant_session)
  • Middleware that sets schema automatically
  1. Migration failures:
# Migration fails for tenant_42 due to unique constraint violation
# Rollback strategy needed for partial failures

Mitigation:

  • Test migrations on copy of production data
  • Idempotent migrations where possible
  • Monitor migration logs per tenant

Optional: Database-per-Tenant

Each tenant has a separate database. Maximum isolation.

Tradeoffs:

  • Connection pool per database (resource overhead)
  • Extremely complex cross-tenant queries
  • Simple migration per tenant (run Alembic once per database)
  • Highest security/compliance level

Not covered in detail. Use only if regulatory requirements demand physical isolation.


10. Migrations (Alembic - High Level)

Why SQLAlchemy Alone Is Insufficient

metadata.create_all() cannot:

  • Modify existing tables (ADD/DROP/ALTER COLUMN)
  • Track which migrations have been applied
  • Rollback changes
  • Handle data transformations during schema changes

Schema Naming and Migration Safety

Alembic generates migration scripts based on model metadata. Explicit constraint names prevent conflicts.

# alembic/env.py
from myapp.models import Base

target_metadata = Base.metadata

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            # Include schemas
            include_schemas=True,
            version_table_schema="public"  # Store alembic_version in public schema
        )

        with context.begin_transaction():
            context.run_migrations()

Multi-Tenant Migration Considerations

Shared schema approach:

  • Single Alembic version table
  • Apply migration once, affects all tenants

Separate schema approach:

  • Apply migration to each tenant schema
  • Track alembic_version per schema or globally
# Custom Alembic env.py for multi-schema
def run_migrations_for_tenant(schema_name: str):
    connectable = create_engine(DATABASE_URL)
    
    with connectable.connect() as connection:
        # Set search path
        connection.execute(text(f"SET search_path TO {schema_name}, public"))
        
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            version_table_schema=schema_name  # Per-tenant version tracking
        )
        
        with context.begin_transaction():
            context.run_migrations()

# CLI wrapper
def migrate_all():
    org_ids = [1, 2, 3, ...]  # Load from database
    for org_id in org_ids:
        schema_name = f"tenant_{org_id}"
        print(f"Migrating {schema_name}")
        run_migrations_for_tenant(schema_name)

Operational Reality

  1. Test migrations on production replica:

    • Never test in production first
    • Measure migration time (long-running ALTER TABLE locks table)
  2. Backward compatibility:

    • Add columns as nullable, backfill data, then set NOT NULL
    • Avoid renaming columns (add new, copy data, drop old)
  3. Zero-downtime migrations:

    • Expand-contract pattern
    • Deploy code compatible with old and new schema
    • Run migration after deployment
  4. Rollback strategy:

    • Always write downgrade() in migrations
    • Test rollback before applying to production

Example migration (add column):

# alembic/versions/001_add_document_status.py
def upgrade():
    op.add_column(
        "documents",
        sa.Column("status", sa.String(50), nullable=True),
        schema="app"
    )
    
    # Backfill default value
    op.execute("UPDATE app.documents SET status = 'active' WHERE status IS NULL")
    
    # Set NOT NULL constraint
    op.alter_column("documents", "status", nullable=False, schema="app")

def downgrade():
    op.drop_column("documents", "status", schema="app")

11. Failure Modes and Footguns

Connection Exhaustion

Symptom:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 20 reached, connection timed out

Causes:

  • Sessions not closed (leaked)
  • Long-running transactions holding connections
  • Pool size too small for concurrency

Debug:

# Check pool status
print(engine.pool.status())  # "Pool size: 10  Connections in pool: 0 Current Overflow: 20 Current Checked out connections: 30"

# Enable pool logging
import logging
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)

Solutions:

  • Always close sessions (use context managers)
  • Set pool_pre_ping=True to detect stale connections
  • Increase pool size if legitimate concurrency exceeds limit
  • Use connection poolers (PgBouncer) for high-concurrency services

Transaction Leaks

Symptom:

  • Uncommitted transactions hold locks
  • Reads see stale data (isolation level issues)
  • Connections never returned to pool

Example:

session = SessionLocal()
user = session.get(User, 1)
user.email = "[email protected]"
# Missing session.commit() or session.rollback()
# Transaction open indefinitely

Solution:

with SessionLocal() as session:
    with session.begin():
        user = session.get(User, 1)
        user.email = "[email protected]"
    # Auto-commit on exit

ORM State Desynchronization

Symptom:

  • Object attribute doesn’t match database value
  • Stale reads after external updates

Example:

session = SessionLocal()
document = session.get(Document, 1)
print(document.title)  # "Old Title"

# Another process updates the database
# UPDATE app.documents SET title = 'New Title' WHERE id = 1

print(document.title)  # Still "Old Title" (identity map)

Solution:

session.expire(document)  # Mark object as stale
print(document.title)  # Triggers SELECT, returns "New Title"

# Or refresh explicitly
session.refresh(document)

Expire on commit:

SessionLocal = sessionmaker(bind=engine, expire_on_commit=True)  # Default

After commit, all objects marked stale. Next access triggers SELECT.


Footgun: SQLite foreign keys

# SQLite
engine = create_engine("sqlite:///dev.db")
# Foreign key violations NOT enforced by default

# Must enable per connection
@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_conn, connection_record):
    cursor = dbapi_conn.cursor()
    cursor.execute("PRAGMA foreign_keys=ON")
    cursor.close()

Footgun: SQLite concurrent writes

# Writer blocks all readers/writers
session.add(document)
session.commit()  # Acquires exclusive lock, blocks all other connections

PostgreSQL: Multiple writers and readers can operate concurrently.


Concurrency and Isolation Issues

Lost update problem:

# Session 1
doc = session1.get(Document, 1)
doc.views = doc.views + 1
session1.commit()

# Session 2 (concurrent)
doc = session2.get(Document, 1)
doc.views = doc.views + 1
session2.commit()

# Final value: +1 instead of +2 (last write wins)

Solutions:

  1. Optimistic locking (version column):
class Document(Base):
    __tablename__ = "documents"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(500))
    version: Mapped[int] = mapped_column(Integer, nullable=False, default=0)  # Version counter
    
    __mapper_args__ = {"version_id_col": version}  # Enable optimistic locking on this column

# Usage
doc = session.get(Document, 1)  # SELECT ... (version=5)
doc.title = "Updated"
session.commit()  # UPDATE documents SET title=..., version=6 WHERE id=1 AND version=5

# If concurrent update changed version, WHERE clause matches 0 rows -> StaleDataError
  1. Pessimistic locking:
from sqlalchemy import select

stmt = select(Document).where(Document.id == 1).with_for_update()  # Add FOR UPDATE clause
doc = session.execute(stmt).scalar_one()  # SELECT ... FOR UPDATE (blocks other FOR UPDATE)
doc.views += 1
session.commit()  # UPDATE, release lock
  1. Atomic increment:
from sqlalchemy import update

stmt = update(Document).where(Document.id == 1).values(
    views=Document.views + 1  # Database-side increment (views = views + 1)
)
session.execute(stmt)  # UPDATE documents SET views = views + 1 WHERE id = 1
session.commit()       # Atomic, no race condition

Summary: When to Use What

Use ORM When:

  • Modeling domain entities with relationships
  • Object lifecycle matches request/transaction boundaries
  • Benefits from identity map and change tracking
  • Relationships are primary access pattern
  • Typical CRUD operations dominate

Use Core When:

  • Bulk operations (insert/update/delete thousands of rows)
  • Complex aggregations, window functions, CTEs
  • Performance-critical paths requiring SQL control
  • Dynamic query construction
  • Returning non-entity data (reports, analytics)

Bypass SQLAlchemy When:

  • Stored procedures or database-specific extensions (PostGIS, full-text search)
  • Extremely high throughput (use asyncpg directly)
  • Schema introspection or admin tasks (use psycopg2 directly)
  • Database-native features not exposed by SQLAlchemy

Continue reading

Next article

Building Systems That Don't Fall Apart: Reliability, Scalability, and Maintainability

Related Content