Блок 1: Базовые запросы
Задача 1: Поиск пользователя по имени
Логика: filter(User.name == user_name) добавляет условие WHERE, .first() возвращает первую запись или None. Проверяем результат перед обращением к атрибутам.
user_name = "Alice"
user = session.query(User).filter(User.name == user_name).first()
if user:
print(f"Found user: {user.name}")
else:
print("User not found")
Задача 2: Вывод пользователей с определённым возрастом
Логика: запрашиваем только столбец User.name — это эффективнее, чем загружать весь объект. .all() возвращает список именованных кортежей. Обращаемся к полю по имени: user.name.
users = session.query(User.name).filter(User.age > 20).all()
print("Users older than 20:")
for user in users:
print(user.name)
Задача 3: Обновление данных пользователя
Логика: обновление в SQLAlchemy — это не UPDATE-запрос напрямую. Находим объект → меняем атрибут → коммитим. SQLAlchemy отслеживает изменения через «unit of work» и сам генерирует UPDATE.
user_name = "Bob"
new_age = 25
user = session.query(User).filter(User.name == user_name).first()
if user:
user.age = new_age
session.commit()
print(f"Updated {user.name}'s age to {new_age}")
else:
print("User not found")
Задача 4: Вывод пользователей моложе 30 лет
Логика: запрашиваем два столбца — получаем список кортежей (name, age). Распаковываем в цикле через for name, age in ....
young_users = session.query(User.name, User.age).filter(User.age < 30).all()
print("Users younger than 30:")
for name, age in young_users:
print(f"{name}, Age: {age}")
Задача 5: Добавление пользователя
Логика: создаём экземпляр модели → добавляем в сессию → коммитим. После коммита объект получит id из базы данных.
new_user = User(name="Charlie", age=40)
session.add(new_user)
session.commit()
Задача 6: Удаление пользователя
Логика: сначала находим объект по имени. Если найден — вызываем session.delete() и коммитим. Без коммита объект не удаляется из БД.
user_name = "Charlie"
user_to_delete = session.query(User).filter(User.name == user_name).first()
if user_to_delete:
session.delete(user_to_delete)
session.commit()
print(f"User {user_name} has been deleted.")
else:
print("User not found")
Задача 7: Сортировка пользователей по возрасту
Логика: order_by(User.age.desc()) — метод .desc() вызывается на атрибуте колонки. Альтернатива: order_by(desc(User.age)) с импортом desc из sqlalchemy.
sorted_users = session.query(User.name, User.age).order_by(User.age.desc()).all()
print("Users sorted by age (descending):")
for name, age in sorted_users:
print(f"{name}, Age: {age}")
Задача 8: Вывод пользователей с ограничением количества
Логика: order_by(User.name) — по умолчанию ASC (по возрастанию). limit(4) ограничивает количество строк на уровне SQL (эффективнее среза Python).
limited_users = session.query(User.name).order_by(User.name).limit(4).all()
print("First 4 users alphabetically:")
for user in limited_users:
print(user.name)
Задача 9: Обновление данных пользователя по ID
Логика: session.get(User, user_id) — это специализированный метод для поиска по первичному ключу. Он использует кеш сессии (identity map) и не создаёт лишний SQL-запрос если объект уже загружен.
user_id = 5
new_age = 35
user = session.get(User, user_id)
if user:
user.age = new_age
session.commit()
print(f"User {user.name}'s age updated to {new_age}.")
else:
print("User not found")
Задача 10: Проверка существования пользователя
Логика: вложенный вызов session.query(...).exists() создаёт подзапрос EXISTS. Внешний session.query(subquery).scalar() выполняет его и возвращает True/False. Это эффективнее, чем загружать весь объект.
user_name = "Charlie"
exists = session.query(
session.query(User).filter_by(name=user_name).exists()
).scalar()
if exists:
print(f"User with name {user_name} exists.")
else:
print("No user found with that name.")
Блок 2: Агрегации
Задача 11: Среднее значение возрастов
Логика: func.avg() передаётся в session.query() вместо модели. .scalar() возвращает единственное значение (первый столбец первой строки) — подходит для одного агрегата.
from sqlalchemy import func
result = session.query(func.avg(User.age)).scalar()
print("Average value of ages:", result)
Задача 12: Максимальный и минимальный возраст
Логика: два отдельных запроса — каждый через .scalar(). Можно объединить в один запрос: session.query(func.max(User.age), func.min(User.age)).one().
from sqlalchemy import func
max_age = session.query(func.max(User.age)).scalar()
min_age = session.query(func.min(User.age)).scalar()
print("Maximum age:", max_age)
print("Minimum age:", min_age)
Задача 13: Группировка пользователей по возрасту
Логика: group_by(User.age) группирует строки по значению возраста. func.count(User.id) подсчитывает строки в каждой группе. Результат — список кортежей (age, count).
from sqlalchemy import func
age_groups = session.query(User.age, func.count(User.id)).group_by(User.age).all()
for age, count in age_groups:
print(f"Age: {age}, Count: {count}")
Задача 14: Фильтрация групп с использованием having()
Логика: having() фильтрует результаты после агрегации — аналог WHERE, но применяется к агрегированным значениям. label('count_users') даёт псевдоним столбцу — удобно для читаемости.
from sqlalchemy import func
age_groups = (
session.query(User.age, func.count(User.id).label('count_users'))
.group_by(User.age)
.having(func.count(User.id) > 1)
.all()
)
for age, count_users in age_groups:
print(f"Age: {age}, Users: {count_users}")
Задача 15: Присоединение и подзапросы
Логика: .subquery() превращает запрос в подзапрос SQL. Доступ к столбцам подзапроса — через subq.c.column_name (c — сокращение от "columns"). Это позволяет сравнивать в основном запросе.
from sqlalchemy import func
average_age_subquery = session.query(
func.avg(User.age).label('average_age')
).subquery()
users_above_average = session.query(User).filter(
User.age > average_age_subquery.c.average_age
).all()
print("Users above average age:")
for user in users_above_average:
print(user.name, user.age)
Блок 3: Связи и JOIN
Задача 16: Вывод всех адресов пользователей
Логика: .join(User.addresses) выполняет INNER JOIN по связи User.addresses — SQLAlchemy автоматически знает, как объединить таблицы (через ForeignKey). Выводятся только пользователи, у которых есть хотя бы один адрес.
users_with_addresses = (
session.query(User.name, Address.description)
.join(User.addresses)
.all()
)
print("Users with their addresses:")
for name, address in users_with_addresses:
print(f"Name: {name}, Address: {address}")
Задача 17: Пользователи без адресов
Логика: .outerjoin() выполняет LEFT OUTER JOIN — включает все строки из левой таблицы (User), даже без совпадений в правой (Address). Пользователи без адресов получат NULL в полях Address. Фильтруем их через Address.id.is_(None).
Address.id.is_(None), а не Address.id == None. Второй вариант генерирует предупреждение SAWarning в SQLAlchemy 2.x — подробнее в Типичных ошибках.
users_without_addresses = (
session.query(User.name)
.outerjoin(User.addresses)
.filter(Address.id.is_(None))
.all()
)
print("Users without addresses:")
for user in users_without_addresses:
print(user.name)
Задача 18: Подсчёт пользователей в каждом городе
Логика: здесь JOIN идёт от Address, поэтому используем join(Address.user). Группируем по городу (Address.description) и считаем пользователей.
from sqlalchemy import func
user_counts_by_city = (
session.query(Address.description, func.count(User.id))
.join(Address.user)
.group_by(Address.description)
.all()
)
print("User counts by city:")
for city, count in user_counts_by_city:
print(f"City: {city}, Users: {count}")
Задача 19: Поиск пользователей по городу
Логика: INNER JOIN с фильтром по полю связанной таблицы. Шаблон: .join(связь).filter(ПолеСвязанной == значение).
city_name = "Berlin"
users_in_city = (
session.query(User.name)
.join(User.addresses)
.filter(Address.description == city_name)
.all()
)
print(f"Users in {city_name}:")
for user in users_in_city:
print(user.name)
Задача 20: Обновление адреса пользователя
Логика: one_or_none() — возвращает объект или None; выбрасывает исключение если найдено более одного. Обновляем адрес через relationship: user.addresses[0].description. SQLAlchemy автоматически сгенерирует UPDATE для таблицы addresses.
user_name = "Bob"
new_city = "Paris"
user = session.query(User).filter(User.name == user_name).one_or_none()
if user and user.addresses:
user.addresses[0].description = new_city
session.commit()
print(f"Updated {user.name}'s address to {new_city}.")
else:
print("User not found or has no addresses")