Python/SQLAlchemy N+1 쿼리 잡기: 이미지 라벨링 플랫폼 실전 편
코드 리뷰에서 발견된 세 곳의 N+1 쿼리. 이미지 100장이면 DB 라운드트립이 101회였던 이유와 `IN` 쿼리 + 벌크 DELETE로 2회로 줄인 실전 기록을 Python/SQLAlchemy로 정리했습니다. Prisma 편의 자매글입니다.
1. 문제 상황: 코드 리뷰가 잡아낸 세 곳의 N+1
지난 세 편의 리팩토링이 끝난 뒤 PR #40에 대한 코드 리뷰 라운드 2를 받았습니다. 리뷰어가 가장 먼저 지적한 것은 성능 문제였습니다.
"services/image_service.py
get_user_images()에서 N+1 쿼리가 보입니다. 어노테이션을 한 번에 로드하면 어떨까요?"
확인해 보니 같은 패턴이 세 곳에 숨어 있었습니다.
| 위치 | 증상 |
|---|---|
services/image_service.py / get_user_images() |
이미지 N장마다 Annotation.query.filter_by(image_id=...) 호출 → N+1 쿼리 |
services/label_service.py / get_user_labels() |
같은 패턴. 라벨 뷰용으로 동일한 N+1 구조 |
services/activity_service.py / add_activity() |
활동 로그 300개 유지 로직이 Python 루프로 한 건씩 DELETE — 비슷한 계열의 낭비 |
앞선 편들이 "구조 개선"이었다면 이번 편은 "숨은 비용 제거"입니다. 리뷰가 아니었다면 눈으로 잡기 어려운 종류의 버그였습니다.
2. N+1 쿼리란 무엇인가 (빠른 복습)
이미 Prisma/TypeScript 편에서 한 번 다룬 주제지만, 언어와 ORM이 달라도 원리는 동일합니다. N개 부모 객체를 순회하면서 각 부모의 관계를 별도 쿼리로 조회할 때 발생합니다.
# N+1의 전형
parents = Parent.query.all() # 쿼리 1회: SELECT * FROM parents
for p in parents:
children = Child.query.filter_by(parent_id=p.id).all() # 쿼리 N회
# ...
부모가 100개면 DB 라운드트립이 101회(1 + 100) 일어납니다. 한 번의 응답에 10ms 걸린다고 가정하면 1.01초, 네트워크가 붙은 환경에서는 더 악화됩니다.
해결의 본질은 한 줄입니다. 부모들의 id를 모아 한 번의 쿼리로 자식을 전부 불러와 메모리에서 매핑한다. SQL에서는 이것이 IN (...)이고, SQLAlchemy에서는 Column.in_(...)입니다.
3. N+1 위치 ①: services/image_service.py
Before
# services/image_service.py (Before)
def get_user_images(user_id: str) -> list:
udir = get_user_upload_dir(user_id)
exts = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp"}
# DB에서 이 사용자의 이미지+어노테이션 정보를 한번에 로드
db_images = {
img.filename: img
for img in Image.query.filter_by(user_id=user_id).all() # ← 쿼리 1
}
result = []
for f in sorted(udir.iterdir(), key=lambda x: x.stat().st_mtime):
if f.suffix.lower() not in exts:
continue
db_img = db_images.get(f.name)
if db_img:
# ❌ 루프마다 쿼리 발생 — 이미지 N장이면 N번
annos = Annotation.query.filter_by(image_id=db_img.id).all()
anno_list = [_to_dict(a) for a in annos]
result.append({
"filename": f.name,
"annotations": anno_list,
# ...
})
return result
주석에는 "이미지+어노테이션 정보를 한번에 로드"라고 써 있었지만, 실제로는 이미지만 한 번에 로드하고 어노테이션은 루프마다 쿼리를 날리고 있었습니다. 주석과 코드가 어긋난 전형적 상황이고, 리뷰어가 아니었다면 눈으로 찾기 매우 어려웠을 것입니다.
After
# services/image_service.py (After)
def get_user_images(user_id: str) -> list:
udir = get_user_upload_dir(user_id)
exts = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp"}
# ① 이미지 로드
db_images_list = Image.query.filter_by(user_id=user_id).all()
db_images = {img.filename: img for img in db_images_list}
# ② 모든 이미지의 어노테이션을 한 번의 IN 쿼리로 로드 (N+1 방지)
image_ids = [img.id for img in db_images_list]
all_annos: dict[str, list] = {}
if image_ids:
annos = Annotation.query.filter(Annotation.image_id.in_(image_ids)).all()
for a in annos:
all_annos.setdefault(a.image_id, []).append(a)
# ③ 파일 순회 — DB 접근 없이 딕셔너리 조회만
result = []
for f in sorted(udir.iterdir(), key=lambda x: x.stat().st_mtime):
if f.suffix.lower() not in exts:
continue
db_img = db_images.get(f.name)
if db_img:
anno_list = [_to_dict(a) for a in all_annos.get(db_img.id, [])]
result.append({
"filename": f.name,
"annotations": anno_list,
# ...
})
return result
핵심 변경점 세 가지:
image_ids = [img.id for img in db_images_list]: 이미지 id를 한 번에 수집Annotation.query.filter(Annotation.image_id.in_(image_ids)).all():IN (...)쿼리 한 번all_annos: dict[str, list]+setdefault(...).append(a): 메모리에서 image_id → annotations로 그룹핑
루프 안에서는 이제 all_annos.get(db_img.id, [])로 메모리 조회만 합니다. 이미지가 100장이든 10,000장이든 DB 라운드트립은 정확히 2회로 고정됩니다.
쿼리 수 비교
| 시나리오 | Before | After |
|---|---|---|
| 이미지 1장 | 2 쿼리 | 2 쿼리 |
| 이미지 10장 | 11 쿼리 | 2 쿼리 |
| 이미지 100장 | 101 쿼리 | 2 쿼리 |
| 이미지 1,000장 | 1,001 쿼리 | 2 쿼리 |
4. N+1 위치 ②: services/label_service.py
같은 패턴이 라벨 서비스에도 있었습니다. 이 함수는 프론트엔드의 라벨 편집 뷰에 데이터를 공급하는 핵심 경로라, N+1이 더 치명적이었습니다.
Before
# services/label_service.py (Before)
def get_user_labels(user_id: str) -> dict:
"""
Returns:
{"filename.jpg": {"annotations": [...], "labeled": True, ...}, ...}
"""
images = Image.query.filter_by(user_id=user_id).all()
result: dict = {}
for img in images:
# ❌ 이미지마다 어노테이션 쿼리
annos = Annotation.query.filter_by(image_id=img.id).all()
anno_list = [_to_dict(a) for a in annos]
result[img.filename] = {
"annotations": anno_list,
"labeled": img.labeled,
# ...
}
return result
After
# services/label_service.py (After)
def get_user_labels(user_id: str) -> dict:
images = Image.query.filter_by(user_id=user_id).all()
# 어노테이션 일괄 로드 (N+1 방지)
image_ids = [img.id for img in images]
all_annos: dict[str, list] = {}
if image_ids:
annos_all = Annotation.query.filter(Annotation.image_id.in_(image_ids)).all()
for a in annos_all:
all_annos.setdefault(a.image_id, []).append(a)
result: dict = {}
for img in images:
anno_list = [_to_dict(a) for a in all_annos.get(img.id, [])]
result[img.filename] = {
"annotations": anno_list,
"labeled": img.labeled,
# ...
}
return result
앞선 image_service.py와 구조가 거의 동일합니다. 반복되는 패턴이라면 헬퍼 함수로 뽑아내는 편이 낫지 않나 싶지만, 두 서비스의 반환 포맷이 달라서 억지로 통합하면 타입이 복잡해집니다. 세 번째 호출 지점이 생기면 그때 추상화를 고려하기로 하고, 지금은 명시적 복제를 유지했습니다.
★ 여기서 하나의 교훈: 두 번 복제된 코드를 성급히 추상화하지 마세요. 세 번째가 나타나면 공통점이 확실해지고, 억지 통합을 피할 수 있습니다. 지금의 두 건은 "N+1 방지 패턴"이 공통이지만 반환 타입이 달라 통합의 이득이 크지 않습니다.
5. N+1 위치 ③: services/activity_service.py (변형)
세 번째 곳은 엄밀히 말하면 "N+1 쿼리"가 아니라 "N개 DELETE 루프" 입니다. 하지만 문제의 본질은 동일합니다. 한 번의 SQL로 끝낼 일을 Python 루프로 N번 쿼리합니다.
Before
# services/activity_service.py (Before)
def add_activity(user_id: str, msg: str, img_name: str = "", anno_cnt: int = 0):
activity = Activity(
user_id=user_id, message=msg,
image_name=img_name, annotation_count=anno_cnt,
)
db.session.add(activity)
db.session.commit()
# 최대 300개 유지 — 오래된 항목 정리
count = Activity.query.filter_by(user_id=user_id).count()
if count > 300:
excess = (
Activity.query
.filter_by(user_id=user_id)
.order_by(Activity.created_at.asc())
.limit(count - 300)
.all()
)
for a in excess:
db.session.delete(a) # ❌ 한 건씩 DELETE
db.session.commit()
"활동 로그를 최근 300개만 유지"한다는 요구를 구현한 코드입니다. 그런데 이 로직은 한 건의 새 활동이 추가될 때마다 실행되고, 초과분을 Python 루프로 한 건씩 삭제합니다. 로그가 누적되면 매 요청마다 수십 건의 DELETE가 나가는 구조였습니다.
After
# services/activity_service.py (After)
def add_activity(user_id: str, msg: str, img_name: str = "", anno_cnt: int = 0):
activity = Activity(
user_id=user_id, message=msg,
image_name=img_name, annotation_count=anno_cnt,
)
db.session.add(activity)
db.session.commit()
# 최대 300개 유지 — 오래된 항목 정리 (bulk delete)
count = Activity.query.filter_by(user_id=user_id).count()
if count > 300:
# 300번째로 최신인 기록의 타임스탬프를 구함
cutoff = (
Activity.query
.filter_by(user_id=user_id)
.order_by(Activity.created_at.desc())
.offset(300)
.first()
)
if cutoff:
# cutoff보다 오래된 레코드를 단일 DELETE로 제거
Activity.query.filter(
Activity.user_id == user_id,
Activity.created_at < cutoff.created_at,
).delete(synchronize_session=False)
db.session.commit()
변경의 핵심은 세 단계입니다.
- 경계 시점 찾기:
order_by(created_at.desc()).offset(300).first()— 최신순으로 300번째를 넘어가는 첫 레코드 - 단일 DELETE:
Activity.query.filter(user_id=..., created_at<cutoff).delete()— 경계보다 오래된 것을 한 번에 삭제 synchronize_session=False: SQLAlchemy에게 "세션 내 인스턴스 동기화를 건너뛰고 바로 DB에 DELETE를 날려라"고 지시. 벌크 삭제에서 흔한 최적화입니다.
결과적으로 "한 건씩 DELETE N회" → "벌크 DELETE 1회"로 줄었습니다.
synchronize_session 이해하기
synchronize_session은 벌크 UPDATE/DELETE에서 자주 튀어나오는 옵션입니다.
| 값 | 동작 |
|---|---|
'evaluate' (기본) |
Python 측에서 WHERE 조건을 평가해 세션 캐시를 갱신. 복잡한 조건은 실패 |
'fetch' |
DB에 먼저 조회해서 영향받는 행을 가져온 뒤 세션 캐시 갱신 |
False |
동기화하지 않음. 세션에 남아 있는 인스턴스가 stale 해짐 |
이 프로젝트의 add_activity()는 호출 직후 함수가 끝나고 요청이 종료됩니다. 세션 캐시가 stale 해져도 다음 요청이 새 세션을 열기 때문에 문제가 없습니다. 그래서 False가 안전하고 가장 빠릅니다.
6. 동일 날 발견된 추가 최적화
코드 리뷰가 한 번에 터지면 N+1만 잡히지 않습니다. 같은 라운드에서 발견된 인접 개선 몇 가지도 함께 정리했습니다.
cancel_training의 중복 app context
# Before
def cancel_training(uid, train_id):
# ... Future.cancel() 시도
app = _get_app() # ❌ 이미 app context 안에 있을 수 있는데 새로 만듦
with app.app_context():
rec = db.session.get(TrainingRecord, train_id)
# ...
라우트에서 이 함수를 호출할 때는 이미 Flask 요청 컨텍스트가 있으므로 _get_app()이 불필요했습니다. 요청 경로에서 호출할 때와 백그라운드에서 호출할 때를 구분해서, 중복 코드를 제거했습니다.
_WORKER_PROCESS 플래그로 복구 훅 중복 방지
create_app()이 워커 프로세스에서도 호출되면 recover_orphaned_training()이 실행됩니다. 하지만 워커가 복구 훅을 돌릴 이유는 없습니다. 그래서 프로세스 시작 시 _WORKER_PROCESS=True를 설정해 복구를 건너뛰게 했습니다.
# services/training_service.py
_WORKER_PROCESS = False # 모듈 레벨 플래그
def _mark_as_worker():
global _WORKER_PROCESS
_WORKER_PROCESS = True
# app.py
def create_app():
# ...
if app.config.get("SQLALCHEMY_DATABASE_URI"):
try:
from services.training_service import (
recover_orphaned_training, _WORKER_PROCESS,
)
if not _WORKER_PROCESS: # ← 워커 프로세스는 건너뜀
with app.app_context():
recover_orphaned_training()
except Exception as e:
logger.warning(f"Training recovery skipped: {e}")
7. 측정: 얼마나 개선됐나
정확한 벤치마크는 프로덕션에서 돌려봐야 하지만, 개념적 추산은 다음과 같습니다.
시나리오: 사용자 한 명이 이미지 100장을 업로드한 상태에서 /api/images를 호출
| 지표 | Before | After |
|---|---|---|
| DB 쿼리 수 | 101 (1 + 100) | 2 |
| 예상 라운드트립 합계 | ~100ms (1ms × 101) | ~2ms |
| 서비스 응답 지연 (나머지 로직 포함) | ~150ms | ~50ms |
시나리오: add_activity() 호출 시 로그 초과분 삭제
로그가 500개 쌓였을 때 새 활동이 하나 추가되면:
| 지표 | Before | After |
|---|---|---|
| DELETE 쿼리 수 | 200회 | 1회 |
| 트랜잭션 로그 부담 | 200건의 개별 트랜잭션 기록 | 1건의 범위 삭제 |
SQLAlchemy의 delete(synchronize_session=False)는 WHERE 절을 그대로 DB에 전달하므로, PostgreSQL이 단일 쿼리 플랜으로 효율적으로 처리합니다.
8. Prisma 편과의 비교
이 주제는 이미 Prisma N+1 쿼리 성능 문제 해결하기에서 TypeScript/Prisma 기준으로 다뤘습니다. 언어와 ORM이 달라도 패턴은 동일하지만, 각 생태계의 문법이 약간 다릅니다. 비교해 두면 새 스택으로 넘어갈 때 맥락 전환이 쉬워집니다.
| 요소 | Prisma (TypeScript) | SQLAlchemy (Python) |
|---|---|---|
| 부모 조회 | prisma.parent.findMany({ where }) |
Parent.query.filter_by(...).all() |
| 자식 일괄 로드 | include: { children: true } (eager load) 또는 별도 findMany({ where: { parentId: { in: ids } } }) |
Annotation.query.filter(Annotation.image_id.in_(ids)).all() |
| 메모리 그룹핑 | Map<id, Child[]> |
dict[str, list] + setdefault |
| 벌크 DELETE | prisma.child.deleteMany({ where }) |
Query.delete(synchronize_session=False) |
| 자동 eager-load | include / select |
joinedload(...) / selectinload(...) |
SQLAlchemy 쪽에는 selectinload(Image.annotations) 같은 선언적 eager-load 옵션도 있습니다. 이 프로젝트에서 IN 쿼리 방식을 선택한 이유는 다음과 같습니다.
- 어노테이션을 가져오는 함수가 이미
all_annos딕셔너리를 만들고 있었음 (파일 시스템의 이미지와 매칭하는 별도 로직 때문) selectinload를 쓰면Image인스턴스의.annotations를 통해 접근하게 되어, 기존 코드 흐름을 더 많이 바꿔야 했음- 명시적
IN쿼리가 디버깅과 로그 추적에 더 친숙함
규모가 더 커지면 selectinload로 전환할 가치가 있지만, 이번 단계에서는 "최소 변경으로 N+1 제거"가 목표였습니다.
9. 핵심 개념 정리
| 개념 | 역할 |
|---|---|
| N+1 쿼리 | 부모 N개에 대해 각각 자식 쿼리를 따로 날리는 안티패턴 |
Column.in_(list) |
IN (...) SQL로 한 번에 자식을 가져오는 SQLAlchemy 기본 도구 |
dict.setdefault(k, []).append(v) |
메모리에서 부모-자식 그룹핑을 만드는 Python 관용구 |
Query.delete(synchronize_session=False) |
벌크 DELETE. 세션 동기화 건너뛰는 옵션 |
| Offset 기반 cutoff | "최신 N개만 유지"를 벌크 DELETE로 구현하는 패턴 |
selectinload / joinedload |
SQLAlchemy의 선언적 eager-load 대안 |
10. 베스트 프랙티스 체크리스트
- [ ] 루프 안에서
Query.filter_by(부모_id=...)패턴이 보이면 N+1 의심 - [ ]
IN (...)쿼리로 자식을 한 번에 로드할 수 있나요? - [ ] 가져온 자식을
dict[부모_id, list]로 메모리 그룹핑했나요? - [ ] 루프 안에서
db.session.delete(obj)가 반복되면 벌크 DELETE 고려 - [ ] 벌크 DELETE 시
synchronize_session옵션이 요청 수명과 맞나요? - [ ] "최신 N개 유지" 로직은 offset 기반 cutoff로 벌크화 가능
- [ ] 로그/메트릭으로 쿼리 수를 모니터링할 수 있나요? (Flask-SQLAlchemy
SQLALCHEMY_ECHO,pg_stat_statements) - [ ] 리뷰어에게 "이 함수가 만드는 쿼리 수"를 명시적으로 설명할 수 있나요?
11. FAQ
Q1. selectinload를 쓰면 더 깔끔하지 않나요?
A. 깔끔해집니다만, 기존 코드가 파일 시스템과 DB를 함께 다루고 있어서 Image.annotations에 의존하면 호출 흐름을 더 바꿔야 했습니다. 명시적 IN 쿼리는 "이 단계에서 무슨 SQL이 나가는지"가 코드에 선명해서 이번에는 이 방식을 선택했습니다. 신규 코드에서는 selectinload를 권장합니다.
Q2. IN 쿼리에 수천 개 ID를 넣으면 괜찮나요?
A. PostgreSQL은 IN 절에 수천 개까지는 무난히 처리합니다. 다만 10,000개를 넘어가면 파싱 비용과 쿼리 플랜 비용이 점점 커집니다. 그 규모가 되면 임시 테이블 INSERT 후 JOIN, 또는 서브쿼리 기반 접근으로 바꾸는 것이 좋습니다. 이 프로젝트 규모에서는 IN이 가장 단순하고 빠릅니다.
Q3. synchronize_session='evaluate'가 기본인데 왜 False를 쓰나요?
A. evaluate는 WHERE 조건을 Python 측에서 평가해 세션 캐시와 동기화하려 합니다. 복잡한 조건(created_at < cutoff)에서는 NotSupportedError를 던질 수 있고, 무엇보다 이 함수는 실행 후 요청이 끝나므로 세션 동기화 이득이 없습니다. False가 가장 빠르고 단순합니다.
Q4. N+1을 자동 감지할 방법이 있나요?
A. Flask 개발 환경에서 SQLALCHEMY_ECHO = True로 모든 쿼리를 로그로 출력해 눈으로 확인하는 것이 가장 단순한 방법입니다. 더 체계적으로는 sqltap, nplusone 같은 라이브러리가 N+1 패턴을 자동 경고합니다. PostgreSQL 측에서는 pg_stat_statements + slow query log로 사후 감지 가능합니다.
Q5. 활동 로그 300개 유지는 왜 "스케줄드 잡"으로 빼지 않나요?
A. 타당한 지적입니다. 쓰기 경로(add_activity)마다 정리 로직이 붙어 있는 것은 단일 책임 원칙 측면에서 좋지 않습니다. 다만 이 프로젝트는 크론/스케줄러 인프라가 없고, 활동 로그가 특정 사용자에게 집중되는 패턴이라 "쓸 때마다 정리"가 오히려 자연스러웠습니다. 스케일이 커지면 APScheduler나 별도 크론으로 분리할 계획입니다.
12. 참고 자료
- Prisma N+1 쿼리 성능 문제 해결하기 — TypeScript/Prisma 편 (자매 글)
- SQLAlchemy 2.x 공식 문서 (
Column.in_): 검색 키워드sqlalchemy column in_ expression - SQLAlchemy 2.x 공식 문서 (
selectinload): 검색 키워드sqlalchemy selectinload eager loading - PostgreSQL 공식 문서 (
pg_stat_statements): 검색 키워드postgresql pg_stat_statements module - Python
sqltap라이브러리: 검색 키워드python sqltap N+1 detection
13. 다음 단계
앞선 네 편(Blueprints → PostgreSQL → ProcessPoolExecutor → N+1)이 모두 코드 리뷰에서 발견된 구조/성능 문제였습니다. 이제 성격이 완전히 다른 버그 이야기로 넘어갑니다. YOLO 학습이 잘 돌아가고 mAP도 올라가는데, 실제 배포하면 정확도가 안 나오는 현상 — 원인은 train과 val 디렉토리가 같은 폴더를 가리키고 있었기 때문입니다. ML 특유의 조용한 버그입니다. 다음 편에서 자세히 다룹니다.
🐍 Flask 백엔드 실전 시리즈 (9부작)
- 모놀리식 app.py를 Blueprint로 분해하기
- JSON 파일 DB에서 PostgreSQL로 마이그레이션
- ML 학습 백그라운드 실행: ProcessPoolExecutor
- Python/SQLAlchemy N+1 쿼리 잡기 (현재 글)
- train과 val이 같은 폴더일 때의 조용한 ML 버그
- 하나의 백엔드로 Detection/Classification/Segmentation
- Fail-fast 설정 검증과 Path Traversal 방어
- Rate Limiting을 걷어낸 날: 폐쇄망 보안
- 작지만 기억할 만한 네 가지 교훈
자매 글(다른 ORM 편):
- Prisma N+1 쿼리 성능 문제 해결하기 (50% 속도 개선) — TypeScript/Prisma 편