Python/SQLAlchemy N+1 쿼리 잡기: 이미지 라벨링 플랫폼 실전 편

코드 리뷰에서 발견된 세 곳의 N+1 쿼리. 이미지 100장이면 DB 라운드트립이 101회였던 이유와 `IN` 쿼리 + 벌크 DELETE로 2회로 줄인 실전 기록을 Python/SQLAlchemy로 정리했습니다. Prisma 편의 자매글입니다.

Python/SQLAlchemy N+1 쿼리 잡기: 이미지 라벨링 플랫폼 실전 편

1. 문제 상황: 코드 리뷰가 잡아낸 세 곳의 N+1

지난 세 편의 리팩토링이 끝난 뒤 PR #40에 대한 코드 리뷰 라운드 2를 받았습니다. 리뷰어가 가장 먼저 지적한 것은 성능 문제였습니다.

"services/image_service.py get_user_images()에서 N+1 쿼리가 보입니다. 어노테이션을 한 번에 로드하면 어떨까요?"

확인해 보니 같은 패턴이 세 곳에 숨어 있었습니다.

위치 증상
services/image_service.py / get_user_images() 이미지 N장마다 Annotation.query.filter_by(image_id=...) 호출 → N+1 쿼리
services/label_service.py / get_user_labels() 같은 패턴. 라벨 뷰용으로 동일한 N+1 구조
services/activity_service.py / add_activity() 활동 로그 300개 유지 로직이 Python 루프로 한 건씩 DELETE — 비슷한 계열의 낭비

앞선 편들이 "구조 개선"이었다면 이번 편은 "숨은 비용 제거"입니다. 리뷰가 아니었다면 눈으로 잡기 어려운 종류의 버그였습니다.

2. N+1 쿼리란 무엇인가 (빠른 복습)

이미 Prisma/TypeScript 편에서 한 번 다룬 주제지만, 언어와 ORM이 달라도 원리는 동일합니다. N개 부모 객체를 순회하면서 각 부모의 관계를 별도 쿼리로 조회할 때 발생합니다.

# N+1의 전형
parents = Parent.query.all()              # 쿼리 1회: SELECT * FROM parents
for p in parents:
    children = Child.query.filter_by(parent_id=p.id).all()  # 쿼리 N회
    # ...

부모가 100개면 DB 라운드트립이 101회(1 + 100) 일어납니다. 한 번의 응답에 10ms 걸린다고 가정하면 1.01초, 네트워크가 붙은 환경에서는 더 악화됩니다.

해결의 본질은 한 줄입니다. 부모들의 id를 모아 한 번의 쿼리로 자식을 전부 불러와 메모리에서 매핑한다. SQL에서는 이것이 IN (...)이고, SQLAlchemy에서는 Column.in_(...)입니다.

3. N+1 위치 ①: services/image_service.py

Before

# services/image_service.py (Before)
def get_user_images(user_id: str) -> list:
    udir = get_user_upload_dir(user_id)
    exts = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp"}

    # DB에서 이 사용자의 이미지+어노테이션 정보를 한번에 로드
    db_images = {
        img.filename: img
        for img in Image.query.filter_by(user_id=user_id).all()  # ← 쿼리 1
    }

    result = []
    for f in sorted(udir.iterdir(), key=lambda x: x.stat().st_mtime):
        if f.suffix.lower() not in exts:
            continue

        db_img = db_images.get(f.name)
        if db_img:
            # ❌ 루프마다 쿼리 발생 — 이미지 N장이면 N번
            annos = Annotation.query.filter_by(image_id=db_img.id).all()
            anno_list = [_to_dict(a) for a in annos]
            result.append({
                "filename": f.name,
                "annotations": anno_list,
                # ...
            })
    return result

주석에는 "이미지+어노테이션 정보를 한번에 로드"라고 써 있었지만, 실제로는 이미지만 한 번에 로드하고 어노테이션은 루프마다 쿼리를 날리고 있었습니다. 주석과 코드가 어긋난 전형적 상황이고, 리뷰어가 아니었다면 눈으로 찾기 매우 어려웠을 것입니다.

After

# services/image_service.py (After)
def get_user_images(user_id: str) -> list:
    udir = get_user_upload_dir(user_id)
    exts = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp"}

    # ① 이미지 로드
    db_images_list = Image.query.filter_by(user_id=user_id).all()
    db_images = {img.filename: img for img in db_images_list}

    # ② 모든 이미지의 어노테이션을 한 번의 IN 쿼리로 로드 (N+1 방지)
    image_ids = [img.id for img in db_images_list]
    all_annos: dict[str, list] = {}
    if image_ids:
        annos = Annotation.query.filter(Annotation.image_id.in_(image_ids)).all()
        for a in annos:
            all_annos.setdefault(a.image_id, []).append(a)

    # ③ 파일 순회 — DB 접근 없이 딕셔너리 조회만
    result = []
    for f in sorted(udir.iterdir(), key=lambda x: x.stat().st_mtime):
        if f.suffix.lower() not in exts:
            continue

        db_img = db_images.get(f.name)
        if db_img:
            anno_list = [_to_dict(a) for a in all_annos.get(db_img.id, [])]
            result.append({
                "filename": f.name,
                "annotations": anno_list,
                # ...
            })
    return result

핵심 변경점 세 가지:

  1. image_ids = [img.id for img in db_images_list]: 이미지 id를 한 번에 수집
  2. Annotation.query.filter(Annotation.image_id.in_(image_ids)).all(): IN (...) 쿼리 한 번
  3. all_annos: dict[str, list] + setdefault(...).append(a): 메모리에서 image_id → annotations로 그룹핑

루프 안에서는 이제 all_annos.get(db_img.id, [])메모리 조회만 합니다. 이미지가 100장이든 10,000장이든 DB 라운드트립은 정확히 2회로 고정됩니다.

쿼리 수 비교

시나리오 Before After
이미지 1장 2 쿼리 2 쿼리
이미지 10장 11 쿼리 2 쿼리
이미지 100장 101 쿼리 2 쿼리
이미지 1,000장 1,001 쿼리 2 쿼리

4. N+1 위치 ②: services/label_service.py

같은 패턴이 라벨 서비스에도 있었습니다. 이 함수는 프론트엔드의 라벨 편집 뷰에 데이터를 공급하는 핵심 경로라, N+1이 더 치명적이었습니다.

Before

# services/label_service.py (Before)
def get_user_labels(user_id: str) -> dict:
    """
    Returns:
        {"filename.jpg": {"annotations": [...], "labeled": True, ...}, ...}
    """
    images = Image.query.filter_by(user_id=user_id).all()

    result: dict = {}
    for img in images:
        # ❌ 이미지마다 어노테이션 쿼리
        annos = Annotation.query.filter_by(image_id=img.id).all()
        anno_list = [_to_dict(a) for a in annos]
        result[img.filename] = {
            "annotations": anno_list,
            "labeled": img.labeled,
            # ...
        }
    return result

After

# services/label_service.py (After)
def get_user_labels(user_id: str) -> dict:
    images = Image.query.filter_by(user_id=user_id).all()

    # 어노테이션 일괄 로드 (N+1 방지)
    image_ids = [img.id for img in images]
    all_annos: dict[str, list] = {}
    if image_ids:
        annos_all = Annotation.query.filter(Annotation.image_id.in_(image_ids)).all()
        for a in annos_all:
            all_annos.setdefault(a.image_id, []).append(a)

    result: dict = {}
    for img in images:
        anno_list = [_to_dict(a) for a in all_annos.get(img.id, [])]
        result[img.filename] = {
            "annotations": anno_list,
            "labeled": img.labeled,
            # ...
        }
    return result

앞선 image_service.py와 구조가 거의 동일합니다. 반복되는 패턴이라면 헬퍼 함수로 뽑아내는 편이 낫지 않나 싶지만, 두 서비스의 반환 포맷이 달라서 억지로 통합하면 타입이 복잡해집니다. 세 번째 호출 지점이 생기면 그때 추상화를 고려하기로 하고, 지금은 명시적 복제를 유지했습니다.

★ 여기서 하나의 교훈: 두 번 복제된 코드를 성급히 추상화하지 마세요. 세 번째가 나타나면 공통점이 확실해지고, 억지 통합을 피할 수 있습니다. 지금의 두 건은 "N+1 방지 패턴"이 공통이지만 반환 타입이 달라 통합의 이득이 크지 않습니다.

5. N+1 위치 ③: services/activity_service.py (변형)

세 번째 곳은 엄밀히 말하면 "N+1 쿼리"가 아니라 "N개 DELETE 루프" 입니다. 하지만 문제의 본질은 동일합니다. 한 번의 SQL로 끝낼 일을 Python 루프로 N번 쿼리합니다.

Before

# services/activity_service.py (Before)
def add_activity(user_id: str, msg: str, img_name: str = "", anno_cnt: int = 0):
    activity = Activity(
        user_id=user_id, message=msg,
        image_name=img_name, annotation_count=anno_cnt,
    )
    db.session.add(activity)
    db.session.commit()

    # 최대 300개 유지 — 오래된 항목 정리
    count = Activity.query.filter_by(user_id=user_id).count()
    if count > 300:
        excess = (
            Activity.query
            .filter_by(user_id=user_id)
            .order_by(Activity.created_at.asc())
            .limit(count - 300)
            .all()
        )
        for a in excess:
            db.session.delete(a)  # ❌ 한 건씩 DELETE
        db.session.commit()

"활동 로그를 최근 300개만 유지"한다는 요구를 구현한 코드입니다. 그런데 이 로직은 한 건의 새 활동이 추가될 때마다 실행되고, 초과분을 Python 루프로 한 건씩 삭제합니다. 로그가 누적되면 매 요청마다 수십 건의 DELETE가 나가는 구조였습니다.

After

# services/activity_service.py (After)
def add_activity(user_id: str, msg: str, img_name: str = "", anno_cnt: int = 0):
    activity = Activity(
        user_id=user_id, message=msg,
        image_name=img_name, annotation_count=anno_cnt,
    )
    db.session.add(activity)
    db.session.commit()

    # 최대 300개 유지 — 오래된 항목 정리 (bulk delete)
    count = Activity.query.filter_by(user_id=user_id).count()
    if count > 300:
        # 300번째로 최신인 기록의 타임스탬프를 구함
        cutoff = (
            Activity.query
            .filter_by(user_id=user_id)
            .order_by(Activity.created_at.desc())
            .offset(300)
            .first()
        )
        if cutoff:
            # cutoff보다 오래된 레코드를 단일 DELETE로 제거
            Activity.query.filter(
                Activity.user_id == user_id,
                Activity.created_at < cutoff.created_at,
            ).delete(synchronize_session=False)
            db.session.commit()

변경의 핵심은 세 단계입니다.

  1. 경계 시점 찾기: order_by(created_at.desc()).offset(300).first() — 최신순으로 300번째를 넘어가는 첫 레코드
  2. 단일 DELETE: Activity.query.filter(user_id=..., created_at<cutoff).delete() — 경계보다 오래된 것을 한 번에 삭제
  3. synchronize_session=False: SQLAlchemy에게 "세션 내 인스턴스 동기화를 건너뛰고 바로 DB에 DELETE를 날려라"고 지시. 벌크 삭제에서 흔한 최적화입니다.

결과적으로 "한 건씩 DELETE N회" → "벌크 DELETE 1회"로 줄었습니다.

synchronize_session 이해하기

synchronize_session은 벌크 UPDATE/DELETE에서 자주 튀어나오는 옵션입니다.

동작
'evaluate' (기본) Python 측에서 WHERE 조건을 평가해 세션 캐시를 갱신. 복잡한 조건은 실패
'fetch' DB에 먼저 조회해서 영향받는 행을 가져온 뒤 세션 캐시 갱신
False 동기화하지 않음. 세션에 남아 있는 인스턴스가 stale 해짐

이 프로젝트의 add_activity()는 호출 직후 함수가 끝나고 요청이 종료됩니다. 세션 캐시가 stale 해져도 다음 요청이 새 세션을 열기 때문에 문제가 없습니다. 그래서 False가 안전하고 가장 빠릅니다.

6. 동일 날 발견된 추가 최적화

코드 리뷰가 한 번에 터지면 N+1만 잡히지 않습니다. 같은 라운드에서 발견된 인접 개선 몇 가지도 함께 정리했습니다.

cancel_training의 중복 app context

# Before
def cancel_training(uid, train_id):
    # ... Future.cancel() 시도
    app = _get_app()  # ❌ 이미 app context 안에 있을 수 있는데 새로 만듦
    with app.app_context():
        rec = db.session.get(TrainingRecord, train_id)
        # ...

라우트에서 이 함수를 호출할 때는 이미 Flask 요청 컨텍스트가 있으므로 _get_app()이 불필요했습니다. 요청 경로에서 호출할 때와 백그라운드에서 호출할 때를 구분해서, 중복 코드를 제거했습니다.

_WORKER_PROCESS 플래그로 복구 훅 중복 방지

create_app()이 워커 프로세스에서도 호출되면 recover_orphaned_training()이 실행됩니다. 하지만 워커가 복구 훅을 돌릴 이유는 없습니다. 그래서 프로세스 시작 시 _WORKER_PROCESS=True를 설정해 복구를 건너뛰게 했습니다.

# services/training_service.py
_WORKER_PROCESS = False  # 모듈 레벨 플래그


def _mark_as_worker():
    global _WORKER_PROCESS
    _WORKER_PROCESS = True


# app.py
def create_app():
    # ...
    if app.config.get("SQLALCHEMY_DATABASE_URI"):
        try:
            from services.training_service import (
                recover_orphaned_training, _WORKER_PROCESS,
            )
            if not _WORKER_PROCESS:  # ← 워커 프로세스는 건너뜀
                with app.app_context():
                    recover_orphaned_training()
        except Exception as e:
            logger.warning(f"Training recovery skipped: {e}")

7. 측정: 얼마나 개선됐나

정확한 벤치마크는 프로덕션에서 돌려봐야 하지만, 개념적 추산은 다음과 같습니다.

시나리오: 사용자 한 명이 이미지 100장을 업로드한 상태에서 /api/images를 호출

지표 Before After
DB 쿼리 수 101 (1 + 100) 2
예상 라운드트립 합계 ~100ms (1ms × 101) ~2ms
서비스 응답 지연 (나머지 로직 포함) ~150ms ~50ms

시나리오: add_activity() 호출 시 로그 초과분 삭제

로그가 500개 쌓였을 때 새 활동이 하나 추가되면:

지표 Before After
DELETE 쿼리 수 200회 1회
트랜잭션 로그 부담 200건의 개별 트랜잭션 기록 1건의 범위 삭제

SQLAlchemy의 delete(synchronize_session=False)는 WHERE 절을 그대로 DB에 전달하므로, PostgreSQL이 단일 쿼리 플랜으로 효율적으로 처리합니다.

8. Prisma 편과의 비교

이 주제는 이미 Prisma N+1 쿼리 성능 문제 해결하기에서 TypeScript/Prisma 기준으로 다뤘습니다. 언어와 ORM이 달라도 패턴은 동일하지만, 각 생태계의 문법이 약간 다릅니다. 비교해 두면 새 스택으로 넘어갈 때 맥락 전환이 쉬워집니다.

요소 Prisma (TypeScript) SQLAlchemy (Python)
부모 조회 prisma.parent.findMany({ where }) Parent.query.filter_by(...).all()
자식 일괄 로드 include: { children: true } (eager load) 또는 별도 findMany({ where: { parentId: { in: ids } } }) Annotation.query.filter(Annotation.image_id.in_(ids)).all()
메모리 그룹핑 Map<id, Child[]> dict[str, list] + setdefault
벌크 DELETE prisma.child.deleteMany({ where }) Query.delete(synchronize_session=False)
자동 eager-load include / select joinedload(...) / selectinload(...)

SQLAlchemy 쪽에는 selectinload(Image.annotations) 같은 선언적 eager-load 옵션도 있습니다. 이 프로젝트에서 IN 쿼리 방식을 선택한 이유는 다음과 같습니다.

  • 어노테이션을 가져오는 함수가 이미 all_annos 딕셔너리를 만들고 있었음 (파일 시스템의 이미지와 매칭하는 별도 로직 때문)
  • selectinload를 쓰면 Image 인스턴스의 .annotations를 통해 접근하게 되어, 기존 코드 흐름을 더 많이 바꿔야 했음
  • 명시적 IN 쿼리가 디버깅과 로그 추적에 더 친숙함

규모가 더 커지면 selectinload로 전환할 가치가 있지만, 이번 단계에서는 "최소 변경으로 N+1 제거"가 목표였습니다.

9. 핵심 개념 정리

개념 역할
N+1 쿼리 부모 N개에 대해 각각 자식 쿼리를 따로 날리는 안티패턴
Column.in_(list) IN (...) SQL로 한 번에 자식을 가져오는 SQLAlchemy 기본 도구
dict.setdefault(k, []).append(v) 메모리에서 부모-자식 그룹핑을 만드는 Python 관용구
Query.delete(synchronize_session=False) 벌크 DELETE. 세션 동기화 건너뛰는 옵션
Offset 기반 cutoff "최신 N개만 유지"를 벌크 DELETE로 구현하는 패턴
selectinload / joinedload SQLAlchemy의 선언적 eager-load 대안

10. 베스트 프랙티스 체크리스트

  • [ ] 루프 안에서 Query.filter_by(부모_id=...) 패턴이 보이면 N+1 의심
  • [ ] IN (...) 쿼리로 자식을 한 번에 로드할 수 있나요?
  • [ ] 가져온 자식을 dict[부모_id, list]로 메모리 그룹핑했나요?
  • [ ] 루프 안에서 db.session.delete(obj)가 반복되면 벌크 DELETE 고려
  • [ ] 벌크 DELETE 시 synchronize_session 옵션이 요청 수명과 맞나요?
  • [ ] "최신 N개 유지" 로직은 offset 기반 cutoff로 벌크화 가능
  • [ ] 로그/메트릭으로 쿼리 수를 모니터링할 수 있나요? (Flask-SQLAlchemy SQLALCHEMY_ECHO, pg_stat_statements)
  • [ ] 리뷰어에게 "이 함수가 만드는 쿼리 수"를 명시적으로 설명할 수 있나요?

11. FAQ

Q1. selectinload를 쓰면 더 깔끔하지 않나요?
A. 깔끔해집니다만, 기존 코드가 파일 시스템과 DB를 함께 다루고 있어서 Image.annotations에 의존하면 호출 흐름을 더 바꿔야 했습니다. 명시적 IN 쿼리는 "이 단계에서 무슨 SQL이 나가는지"가 코드에 선명해서 이번에는 이 방식을 선택했습니다. 신규 코드에서는 selectinload를 권장합니다.

Q2. IN 쿼리에 수천 개 ID를 넣으면 괜찮나요?
A. PostgreSQL은 IN 절에 수천 개까지는 무난히 처리합니다. 다만 10,000개를 넘어가면 파싱 비용과 쿼리 플랜 비용이 점점 커집니다. 그 규모가 되면 임시 테이블 INSERT 후 JOIN, 또는 서브쿼리 기반 접근으로 바꾸는 것이 좋습니다. 이 프로젝트 규모에서는 IN이 가장 단순하고 빠릅니다.

Q3. synchronize_session='evaluate'가 기본인데 왜 False를 쓰나요?
A. evaluate는 WHERE 조건을 Python 측에서 평가해 세션 캐시와 동기화하려 합니다. 복잡한 조건(created_at < cutoff)에서는 NotSupportedError를 던질 수 있고, 무엇보다 이 함수는 실행 후 요청이 끝나므로 세션 동기화 이득이 없습니다. False가 가장 빠르고 단순합니다.

Q4. N+1을 자동 감지할 방법이 있나요?
A. Flask 개발 환경에서 SQLALCHEMY_ECHO = True로 모든 쿼리를 로그로 출력해 눈으로 확인하는 것이 가장 단순한 방법입니다. 더 체계적으로는 sqltap, nplusone 같은 라이브러리가 N+1 패턴을 자동 경고합니다. PostgreSQL 측에서는 pg_stat_statements + slow query log로 사후 감지 가능합니다.

Q5. 활동 로그 300개 유지는 왜 "스케줄드 잡"으로 빼지 않나요?
A. 타당한 지적입니다. 쓰기 경로(add_activity)마다 정리 로직이 붙어 있는 것은 단일 책임 원칙 측면에서 좋지 않습니다. 다만 이 프로젝트는 크론/스케줄러 인프라가 없고, 활동 로그가 특정 사용자에게 집중되는 패턴이라 "쓸 때마다 정리"가 오히려 자연스러웠습니다. 스케일이 커지면 APScheduler나 별도 크론으로 분리할 계획입니다.

12. 참고 자료

  • Prisma N+1 쿼리 성능 문제 해결하기 — TypeScript/Prisma 편 (자매 글)
  • SQLAlchemy 2.x 공식 문서 (Column.in_): 검색 키워드 sqlalchemy column in_ expression
  • SQLAlchemy 2.x 공식 문서 (selectinload): 검색 키워드 sqlalchemy selectinload eager loading
  • PostgreSQL 공식 문서 (pg_stat_statements): 검색 키워드 postgresql pg_stat_statements module
  • Python sqltap 라이브러리: 검색 키워드 python sqltap N+1 detection

13. 다음 단계

앞선 네 편(Blueprints → PostgreSQL → ProcessPoolExecutor → N+1)이 모두 코드 리뷰에서 발견된 구조/성능 문제였습니다. 이제 성격이 완전히 다른 버그 이야기로 넘어갑니다. YOLO 학습이 잘 돌아가고 mAP도 올라가는데, 실제 배포하면 정확도가 안 나오는 현상 — 원인은 trainval 디렉토리가 같은 폴더를 가리키고 있었기 때문입니다. ML 특유의 조용한 버그입니다. 다음 편에서 자세히 다룹니다.

🐍 Flask 백엔드 실전 시리즈 (9부작)

  1. 모놀리식 app.py를 Blueprint로 분해하기
  2. JSON 파일 DB에서 PostgreSQL로 마이그레이션
  3. ML 학습 백그라운드 실행: ProcessPoolExecutor
  4. Python/SQLAlchemy N+1 쿼리 잡기 (현재 글)
  5. train과 val이 같은 폴더일 때의 조용한 ML 버그
  6. 하나의 백엔드로 Detection/Classification/Segmentation
  7. Fail-fast 설정 검증과 Path Traversal 방어
  8. Rate Limiting을 걷어낸 날: 폐쇄망 보안
  9. 작지만 기억할 만한 네 가지 교훈

자매 글(다른 ORM 편):