DB 연결
어플리케이션에서 데이터를 저장하려면 데이터 저장소가 필요하다. 이미지나 직렬화된 데이터를 저장하는 오브젝트 스토리지, JSON 파일을 저장하는 도큐먼트 스토리지, key-value를 저장하는 스토리지 등등 다양한 종류가 있지만, 가장 흔하게 사용하는 것은 RDB(관계형 데이터베이스)일 것이다.
이번 글에서는 FastAPI를 사용해 RDB에 연결하고, 데이터를 저장, 조회, 수정, 삭제하는 방법을 알아볼 것이다. RDB로는 MySQL 8버전을 사용할 것이고, FastAPI 공식 튜토리얼에서도 사용하는 SQLAlchemy를 사용할 것이다.
SQLAlchemy
SQLAlchemy는 Python에서 SQL을 사용할 수 있게 해주는 라이브러리이며, ORM 기능도 제공한다. 쉽게 말해, SQLAlchemy를 사용하면 SQL을 사용해서 RDB에 연결할 수 있고, SQL을 사용해 데이터를 생성, 조회, 수정, 삭제할 수 있으며, 트랜잭션과 커넥션 풀을 관리하는 등 RDB를 사용해서 할 수 있는 다양한 기능을 활용할 수 있다는 뜻이다.
SQLAlchemy는 크게 SQLAlchemy Core와 SQLAlchemy ORM으로 구성된다. SQLAlchemy Core는 Python 코드를 분석해 SQL 쿼리를 생성하는 등의 저수준의 동작을 담당하는 핵심적인 부분이다. SQLAlchemy ORM은 Object-Relational Mapping이라는 이름 그대로 Python 객체와 RDB의 테이블간의 매핑을 자동화해주는 기능을 담당한다. 따라서 SQLAlchemy를 사용한다고 ORM을 반드시 사용해야하는 것은 아니다.
Engine
Engine은 Python의 DBAPI 공식 명세(PEP 249)를 사용할 수 있게 해주는 SQLAlchemy의 시작점이라고 할 수 있다. Engine이 생성될 때 커넥션을 관리하고 생성해주는 Pool 객체, 데이터베이스 종류에 맞게 SQL을 만들어주는 Dialect 객체가 생성된다. 따라서 Engine 객체는 여러 번 생성할 필요 없이 한 번 생성해서 Engine.connect 함수를 통해 커넥션을 획득할 수 있다. Pool 객체는 Engine이 생성될 때 커넥션을 바로 생성하지 않고, Engine.connect 함수가 호출될 때 lazy하게 커넥션을 생성한다.
이제 코드를 통해 직접 연결해보자. 필요한 패키지를 설치해야한다. SQLAlchemy와 함께 mysql을 비동기로 사용할 수 있게 해주는 aiomysql과 aiomysql을 사용하는 데 필요한 greenlet을 설치해준다.
pip install sqlalchemy aiomysql greenlet
MySQL의 username은 root이고, password는 설정하지 않았다. 데이터베이스 이름은 test로 만들어두었다. echo 옵션은 실행되는 SQL을 로그로 확인할 수 있게 해주는 옵션이다.
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("mysql+aiomysql://root:@localhost:3306/test", echo=True)
데이터베이스 연결이 정상적으로 되었다면, 쿼리를 실행해보자.
from sqlalchemy import text
@app.get("/")
async def read_root():
usernames = []
async with engine.connect() as connection:
result = await connection.execute(text("select username from users"))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
Transaction
위의 코드를 실행하고 로그를 확인해보면 롤백이 되는 것을 확인할 수 있다. 이 말은 쿼리를 실행하고 직접 커밋을 해야한다는 뜻이다. 아래와 같이 users 테이블에 데이터를 추가하는 코드를 보자.
@app.get("/")
async def read_root():
usernames = []
async with engine.connect() as connection:
await connection.execute(text("insert into users (username) values ('user1')"))
result = await connection.execute(text("select username from users"))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
분명 insert를 실행하지만, 여러번 실행해도 username 개수가 늘어나지 않는다. 롤백이 되고있다는 뜻이다. 이제 한 줄을 추가해서 커밋을 해보자.
@app.get("/")
async def read_root():
usernames = []
async with engine.connect() as connection:
await connection.execute(text("insert into users (username) values ('user5')"))
await connection.commit()
result = await connection.execute(text("select username from users"))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
이제 커밋이 되어 데이터베이스에 반영이 되는 것을 확인할 수 있다. 그러나 직접 커밋을 호출하는 것은 귀찮고 실수가 일어나기 쉽다. 따라서 Engine은 begin이라는 함수를 통해 트랜잭션을 자동으로 관리해준다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
await connection.execute(text("insert into users (username) values ('user5')"))
result = await connection.execute(text("select username from users"))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
SQLAlchemy Core
이제 SQLAlchemy를 이용해 데이터를 조작하는 방법을 알아보자. 먼저, ORM을 사용하지 않는 방법을 알아본다.
text
먼저, 위에서 살펴본 것처럼 text를 이용해 직접 쿼리를 작성하는 방법이다. 위에서 select와 insert를 모두 살펴봤으니, 이번에는 파라미터를 추가해보자.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
await connection.execute(
text("insert into users (username) values (:username)"),
[{"username": "user1"}, {"username": "user2"}]
)
result = await connection.execute(text("select username from users"))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
text의 쿼리에 :username과 같이 파라미터 이름을 추가하고, 딕셔너리를 이용해 값을 넣어줄 수 있다. 마찬가지로, select에서도 동일한 방식으로 파라미터를 넣어줄 수 있다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
result = await connection.execute(
text("select username from users where id < :id"),
[{"id": 10}]
)
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
이렇게 직접 쿼리를 작성하는 것은 다른 세팅이 필요없다는 장점이 있지만, 오타가 났을 때 디버깅하기 어려울 뿐만 아니라 가독성의 문제도 있을 수 있다.
MetaData
SQLAlchemy에서 제공하는 다른 방법을 알아보기 전에, 간단한 세팅이 필요하다. 관계형 데이터베이스는 테이블의 스키마가 정해져있고, 그것에 맞춰서 데이터를 변경해야한다. 따라서 SQLAlchemy가 데이터베이스 테이블의 정보를 알 수 있도록 MetaData를 설정해줘야 한다.
from sqlalchemy import Table, MetaData, Column, BigInteger, String
metadata_obj = MetaData()
user_table = Table(
"users",
metadata_obj,
Column("id", BigInteger, primary_key=True),
Column("username", String(30))
)
insert
이제 text를 사용하지 않고 데이터베이스의 데이터를 조작하는 방법을 알아보자. 기본적으로 SQL의 순서와 거의 같지만, Python 함수를 사용한다고 생각하면 쉽다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
await connection.execute(
insert(user_table).values(id=30, username="user30")
)
result = await connection.execute(text("select username from users"))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
위의 코드는 아래와 같은 쿼리를 실행할 것이다.
INSERT INTO users (id, username) VALUES (30, 'user30')
만약 한 번에 여러 데이터를 삽입하고 싶다면 values 대신 list를 이용해 파라미터를 전달하면 된다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
await connection.execute(
insert(user_table),
[{"id": 50, "username": "user50"}, {"id": 51, "username": "user51"}]
)
result = await connection.execute(text("select username from users"))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
select
select도 마찬가지로, 일반적인 SQL을 생각하고 코드를 작성하면 된다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
result = await connection.execute(
select(user_table).where(user_table.c.id < 10)
)
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
여기에 order by같은 기능도 당연히 추가할 수 있다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
result = await connection.execute(
select(user_table)
.where(user_table.c.id < 10)
.order_by(desc(user_table.c.id))
)
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
update
이번에는 update를 해보자. 마찬가지로 update가 필요한 테이블을 지정하고, row를 선택한 뒤, 새로운 값을 넣어주면 된다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
await connection.execute(
update(user_table)
.where(user_table.c.username == "user1")
.values(username="newname")
)
result = await connection.execute(select(user_table))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
한 번에 여러 row를 수정하는 것도 가능하다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
await connection.execute(
update(user_table)
.where(user_table.c.username.like("user%"))
.values(username="newname")
)
result = await connection.execute(select(user_table))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
delete
마지막으로 delete하는 방법도 똑같다.
@app.get("/")
async def read_root():
usernames = []
async with engine.begin() as connection:
await connection.execute(
delete(user_table)
.where(user_table.c.username.like("new%"))
)
result = await connection.execute(select(user_table))
for row in result:
usernames.append(row.username)
return {"usernames": usernames}
3줄 요약
- FastAPI 어플리케이션에서 DB를 연결할 수 있다
- SQLAlchemy는 Python에서 DBAPI를 사용할 수 있는 대표적인 라이브러리다
- SQLAlchemy를 사용하면 SQL을 사용하는 것과 매우 비슷하게 쿼리를 실행할 수 있다
'Python > FastAPI' 카테고리의 다른 글
[FastAPI] FastAPI 튜토리얼 (6) - 로그인 (0) | 2023.08.02 |
---|---|
[FastAPI] FastAPI 튜토리얼 (5) - SQLAlchemy ORM (0) | 2023.07.30 |
[FastAPI] FastAPI 튜토리얼 (3) - Dependency (0) | 2023.07.20 |
[FastAPI] FastAPI 튜토리얼 (2) - 매개변수 (0) | 2023.07.14 |
[FastAPI] FastAPI 튜토리얼 (1) - FastAPI 소개 (0) | 2023.07.12 |