Background Task
API를 개발하다보면 사용자의 요청을 받아 오래걸리는 작업을 해야하는 경우가 종종 있다. 예를 들어, 이메일을 전송하는 로직은 몇 초 이상 걸리기도 한다. 사용자 입장에서는 이메일을 보내는 데 몇 초가 걸리는 것을 기다릴 수 있다고 생각할 수도 있지만, 서버의 입장에서는 불필요하게 리소스를 점유하는 상황을 피하고 싶을 것이다.
이러한 경우에 이메일이 전송 완료되는 것을 기다릴 필요 없이 이메일 전송요청이 오면 바로 응답을 보내고, 별도로 이메일을 보내는 작업을 하는 것이 최선의 방법일 것이다. FastAPI에서는 이렇게 응답을 빠르게 보내고, 추가 작업을 처리할 수 있는 Background Task를 제공한다.
사용법
Background Task를 사용하기 위해서는 먼저 백그라운드에서 실행할 동작을 함수로 정의해야한다. 이 함수는 async def와 def 모두 지원한다. 여기서는 이메일을 보내는 함수를 예시로 들어보자.
def write_notification(email: str, message: str):
email_sender.send(email, message)
위 함수는 이메일 주소를 나타내는 email과 내용을 나타내는 message 두 개의 파라미터를 받는데, 파라미터의 개수와 종류에 관계없이 사용이 가능하다.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
@app.get("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
Background Task를 사용하기 위해서는 위와 같이 path operation에 BackgroundTasks를 파라미터로 선언하면 된다. FastAPI는 자동적으로 BackgroundTasks 객체를 생성해서 주입해주고, BackgroundTasks 객체는 write_notification 함수를 실행해준다. write_notification의 파라미터는 email과 같이 순서대로 넣어줄 수도 있고, message와 같이 keyword argument로 전달할 수도 있다.
실행 순서
Background Task를 사용할 때는 실행순서를 정확히 아는 것이 중요하다. FastAPI 공식 문서에는 아래와 같이 설명이 되어있다.
You can define background tasks to be run after returning a response.
즉, 응답을 반환한 후에 태스크를 실행한다는 의미로 볼 수 있다. 실제로 이것을 테스트해보기 위해 print를 추가해볼 수 있다.
def write_notification(email: str, message: str):
print("send email")
email_sender.send(email, message)
@app.get("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
print("background task added")
return {"message": "Notification sent in the background"}
결과는 아래와 같이 나온다.
background task added
INFO: 127.0.0.1:59565 - "GET /send-notification/sdfsdf HTTP/1.1" 200 OK
send email
먼저 send_notification에서 background_tasks.add_task로 실행해야할 동작을 등록하고, 응답을 반환한 뒤 write_notification 함수를 실행하는 것이다.
이번에는 Depends를 사용하는 좀 더 복잡한 dependency를 보자. 쿼리 파라미터 q를 입력받아 로그를 쓰는 경우를 생각해보자.
def write_notification(email: str, message: str):
print("send email")
email_sender.send(email, message)
def write_log(message: str):
print("write log")
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}"
print(message)
background_tasks.add_task(write_log, message)
return q
@app.get("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]):
background_tasks.add_task(write_notification, email, message="some notification")
print("background task added")
return {"message": "Notification sent in the background"}
의존관계를 생각해보면 get_query가 가장 먼저 실행될 것이고, send_notification이 실행되는 것은 당연하다. 그런데 Background Task의 실행 순서는 어떻게 될까? 위 코드를 실행해보면 아래와 같은 결과가 나온다.
found query: 123
background task added
INFO: 127.0.0.1:59612 - "GET /send-notification/sdfsdf?q=123 HTTP/1.1" 200 OK
write log
send email
즉, send_notification 함수에서 응답을 반환한 뒤에 background_tasks에 등록된 write_log, write_notification 함수가 순서대로 실행되는 것을 확인할 수 있다. 이것은 FastAPI에 의해 background_tasks 객체가 공유되기 때문에 먼저 등록된 task인 write_log가 먼저 실행되고, 나중에 등록된 write_notification이 뒤에 실행되는 것이다.
실행 Thread
그렇다면, Background Task는 어디에서 실행되는 것일까? 이것은 실행되는 함수가 async def인지, def인지에 따라 달라진다. 먼저, 위 예시에서처럼 def로 정의된 함수를 살펴보자. 실행되는 thread를 확인하기 위해 threading.get_ident 함수를 추가하고, 동시에 여러 요청을 보낼 수 있게 1초동안 멈추는 코드를 추가했다.
def write_notification(email: str, message: str):
sleep(1)
print(threading.get_ident())
email_sender.send(email, message)
@app.get("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
print(threading.get_ident())
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
위 코드를 실행해보면 send_notification 함수가 실행되는 스레드의 id는 8394252416, write_notification 함수가 실행되는 스레드는 6154924032, 6171750400, 6188576768 등으로 매번 달라지는 것을 확인할 수 있다. 즉, 동기함수의 경우 Background Task를 위한 별도의 스레드 풀에서 실행된다는 뜻이다.
이번에는 write_notification 함수를 async def로 바꿔보자.
async def write_notification(email: str, message: str):
sleep(1)
print(threading.get_ident())
email_sender.send(email, message)
@app.get("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
print(threading.get_ident())
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
이번에는 send_notification 함수와 write_notification 함수가 실행되는 thread의 id가 8394252416로 동일한 것을 확인할 수 있다. 이것은 비동기 서버에서 요청을 처리하는 이벤트 루프가 Background Task도 처리한다는 뜻이다. 따라서 sleep과 같이 오래걸리는 작업을 실행하는 경우 이벤트 루프가 새로운 요청을 받아들이지 못하게 된다. 실제로 여러 요청을 동시에 보내도 한 번에 한 개의 요청만 처리하는 것을 확인할 수 있다.
결론적으로, 위와 같이 내부적으로 비동기를 지원하지 않는 작업의 경우 def를 이용해 별도의 스레드에서 실행해야하고, 완전히 비동기를 지원하는 함수의 경우 별도의 스레드를 생성하지 않는 것이 효율적이기 때문에 async def를 이용하는 것이 바람직하다.
3줄 요약
- 빠르게 응답을 보내고 추가적인 작업을 해야하는 경우 Background Task를 사용할 수 있다
- Background Task는 응답을 보낸 후에 실행이 시작된다
- 동기와 비동기를 잘 구분해서 사용하지 않으면 비효율적으로 동작할 수 있다
'Python > FastAPI' 카테고리의 다른 글
[FastAPI] FastAPI 튜토리얼 (8) - Middleware (0) | 2023.08.12 |
---|---|
[FastAPI] FastAPI 튜토리얼 (7) - Exception (0) | 2023.08.05 |
[FastAPI] FastAPI 튜토리얼 (6) - 로그인 (0) | 2023.08.02 |
[FastAPI] FastAPI 튜토리얼 (5) - SQLAlchemy ORM (0) | 2023.07.30 |
[FastAPI] FastAPI 튜토리얼 (4) - SQLAlchemy Core (0) | 2023.07.22 |