Python asyncio 비동기 프로그래밍 가이드

asyncio란?

asyncio는 파이썬의 비동기 I/O 프레임워크입니다. 네트워크 요청, 파일 읽기, DB 쿼리처럼 I/O 대기 시간이 긴 작업을 효율적으로 처리할 수 있게 해줍니다. 스레드 없이도 동시에 수천 개의 연결을 처리할 수 있어 웹 서버, 크롤러, 챗봇 등에 널리 사용됩니다.

이 글에서는 이벤트 루프의 개념부터 async/await 문법, 동시 실행 패턴, 에러 핸들링까지 실전 예제로 정리합니다.

동기 vs 비동기 — 왜 asyncio인가?

동기 코드에서는 I/O 작업이 완료될 때까지 프로그램이 블로킹됩니다. 비동기 코드에서는 I/O 대기 중에 다른 작업을 실행할 수 있습니다.

import asyncio
import time

# 동기 방식: 3개의 작업을 순차 실행 (총 3초)
def sync_tasks():
    for i in range(3):
        print(f"작업 {i} 시작")
        time.sleep(1)  # 1초 블로킹
        print(f"작업 {i} 완료")

# 비동기 방식: 3개의 작업을 동시 실행 (총 1초)
async def async_task(task_id):
    print(f"작업 {task_id} 시작")
    await asyncio.sleep(1)  # 1초 대기 (다른 작업 실행 가능)
    print(f"작업 {task_id} 완료")

async def async_tasks():
    await asyncio.gather(
        async_task(0),
        async_task(1),
        async_task(2),
    )

# 동기: 약 3초 소요
start = time.perf_counter()
sync_tasks()
print(f"동기 소요 시간: {time.perf_counter() - start:.2f}초")
# 동기 소요 시간: 3.00초

# 비동기: 약 1초 소요
start = time.perf_counter()
asyncio.run(async_tasks())
print(f"비동기 소요 시간: {time.perf_counter() - start:.2f}초")
# 비동기 소요 시간: 1.00초
항목동기 (sync)비동기 (async)
I/O 대기블로킹논블로킹
실행 모델순차 실행이벤트 루프 기반 동시 실행
적합한 작업CPU 바운드I/O 바운드
복잡도낮음보통

핵심 개념 — 코루틴과 이벤트 루프

async def로 정의된 함수는 **코루틴(coroutine)**입니다. 코루틴은 직접 호출해도 실행되지 않고 코루틴 객체를 반환합니다. 이벤트 루프가 이를 스케줄링하여 실행합니다.

import asyncio

async def fetch_user(user_id: int) -> dict:
    """사용자 정보를 비동기로 가져옵니다."""
    print(f"사용자 {user_id} 조회 시작")
    await asyncio.sleep(0.5)  # DB 쿼리 시뮬레이션
    print(f"사용자 {user_id} 조회 완료")
    return {"id": user_id, "name": f"User_{user_id}"}

async def main():
    # 코루틴을 직접 호출하면 실행되지 않음
    coro = fetch_user(1)
    print(type(coro))  # <class 'coroutine'>

    # await로 실행해야 결과를 얻음
    user = await coro
    print(user)  # {'id': 1, 'name': 'User_1'}

# asyncio.run()이 이벤트 루프를 생성하고 코루틴을 실행
asyncio.run(main())

asyncio.run()은 이벤트 루프를 생성하고, 코루틴이 완료될 때까지 실행한 뒤 루프를 닫습니다. 프로그램의 진입점에서 한 번만 호출하는 것이 일반적입니다.

동시 실행 — gather, TaskGroup

여러 코루틴을 동시에 실행하려면 asyncio.gather() 또는 TaskGroup을 사용합니다.

import asyncio

async def fetch_url(url: str) -> str:
    """URL에서 데이터를 가져오는 시뮬레이션"""
    delay = len(url) * 0.1  # URL 길이에 비례한 딜레이
    await asyncio.sleep(delay)
    return f"{url}: {len(url)}바이트"

async def with_gather():
    """gather로 동시 실행 — 결과를 리스트로 반환"""
    urls = ["https://api.example.com/users",
            "https://api.example.com/posts",
            "https://api.example.com/comments"]

    results = await asyncio.gather(*[fetch_url(u) for u in urls])
    for result in results:
        print(result)
    # https://api.example.com/users: 30바이트
    # https://api.example.com/posts: 30바이트
    # https://api.example.com/comments: 33바이트

async def with_taskgroup():
    """TaskGroup으로 동시 실행 (Python 3.11+) — 구조적 동시성"""
    urls = ["https://api.example.com/users",
            "https://api.example.com/posts"]

    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_url(urls[0]))
        task2 = tg.create_task(fetch_url(urls[1]))

    # TaskGroup 블록을 벗어나면 모든 태스크 완료가 보장됨
    print(task1.result())  # https://api.example.com/users: 30바이트
    print(task2.result())  # https://api.example.com/posts: 30바이트

asyncio.run(with_gather())
asyncio.run(with_taskgroup())

TaskGroup은 Python 3.11에서 도입된 구조적 동시성 패턴입니다. 하나의 태스크가 예외를 발생시키면 나머지 태스크도 자동으로 취소되므로, gather보다 안전한 에러 핸들링을 제공합니다.

타임아웃과 취소

비동기 작업에 타임아웃을 설정하여 무한 대기를 방지할 수 있습니다.

import asyncio

async def slow_operation():
    """오래 걸리는 작업 시뮬레이션"""
    await asyncio.sleep(10)
    return "완료"

async def main():
    # asyncio.timeout (Python 3.11+)
    try:
        async with asyncio.timeout(2.0):
            result = await slow_operation()
    except TimeoutError:
        print("타임아웃 발생! 2초 초과")

    # asyncio.wait_for (3.11 이전 호환)
    try:
        result = await asyncio.wait_for(
            slow_operation(), timeout=2.0
        )
    except asyncio.TimeoutError:
        print("wait_for 타임아웃!")

asyncio.run(main())
# 타임아웃 발생! 2초 초과
# wait_for 타임아웃!

세마포어 — 동시성 제한

외부 API 호출 시 동시 요청 수를 제한해야 하는 경우가 많습니다. asyncio.Semaphore를 사용합니다.

import asyncio

async def fetch_with_limit(sem: asyncio.Semaphore,
                           url: str) -> str:
    async with sem:  # 세마포어 획득 (동시 실행 수 제한)
        print(f"요청 시작: {url}")
        await asyncio.sleep(1)  # API 호출 시뮬레이션
        print(f"요청 완료: {url}")
        return f"{url}: OK"

async def main():
    sem = asyncio.Semaphore(3)  # 최대 동시 3개
    urls = [f"https://api.example.com/item/{i}"
            for i in range(10)]

    results = await asyncio.gather(
        *[fetch_with_limit(sem, u) for u in urls]
    )
    print(f"총 {len(results)}개 완료")

asyncio.run(main())
# 3개씩 동시에 실행, 총 10개 완료

비동기 이터레이터와 제너레이터

async forasync def ... yield를 사용하면 비동기 스트림을 처리할 수 있습니다.

import asyncio

async def fetch_pages(total: int):
    """페이지를 비동기로 하나씩 가져오는 제너레이터"""
    for page in range(1, total + 1):
        await asyncio.sleep(0.3)  # 페이지 로딩 시뮬레이션
        yield {"page": page, "items": [f"item_{i}" for i in range(5)]}

async def main():
    async for page_data in fetch_pages(3):
        print(f"페이지 {page_data['page']}: "
              f"{len(page_data['items'])}개 항목")
    # 페이지 1: 5개 항목
    # 페이지 2: 5개 항목
    # 페이지 3: 5개 항목

asyncio.run(main())

정리

asyncio는 I/O 바운드 작업의 성능을 극적으로 향상시키는 파이썬의 핵심 비동기 프레임워크입니다.

  • 기본 문법: async def로 코루틴 정의, await로 실행 대기
  • 동시 실행: asyncio.gather()로 여러 코루틴을 동시 실행
  • 구조적 동시성: TaskGroup(3.11+)으로 안전한 태스크 관리
  • 타임아웃: asyncio.timeout() 또는 wait_for()로 무한 대기 방지
  • 동시성 제한: Semaphore로 동시 실행 수 제한
  • 비동기 이터레이션: async for로 스트림 데이터 처리
  • 주의점: CPU 바운드 작업에는 asyncio가 아닌 multiprocessing이 적합

이 글이 도움이 되었나요?