SQLAlchemy ORM
이전 글에서 SQLAlchemy를 사용해서 직접 쿼리를 작성하는 방법을 알아보았다. 이번 글에서는 SQLAlchemy ORM을 사용해서 좀 더 편리하게 데이터를 삽입, 조회, 수정, 삭제하는 방법을 알아보도록 하겠다.
Session
SQLAlchemy Core에서 Connection을 이용해서 데이터베이스에 연결한 것처럼, SQLAlchemy ORM은 Session 객체를 이용해 데이버베이스와 상호작용할 수 있다. 사용 방법도 Connection과 흡사하고, 실제로도 Session 내부적으로 Connection을 사용한다.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
engine = create_async_engine("mysql+aiomysql://root:@localhost:3306/test", echo=True)
async_session = async_sessionmaker(bind=engine, class_=AsyncSession)
위 코드에서는 SQLAlchemy와 마찬가지로 engine을 만들어주고, session을 만들어주는 async_sessionmaker 객체를 생성한다. 참고로 async_sessionmaker는 snake case로 적혀있어서 함수라고 생각하기 쉽지만, 실제로는 AsyncSession class의 inner class이고, async_session은 async_sessionmaker의 객체이다.
Declarative Mapping
본격적으로 데이터를 다루기 전에, SQLAlchemy Core에서 MetaData를 설정해준 것처럼 ORM에서도 데이터베이스 테이블 MetaData를 알아야 한다. ORM에서는 Python 클래스를 선언하면서 데이터베이스 테이블 MetaData를 매핑해 줄 수 있는데, 이것을 Declarative Mapping이라고 한다.
먼저, 모든 클래스의 부모클래스가 되는 Base 클래스를 선언해줘야 한다. Base 클래스는 데이터베이스 테이블과의 매핑을 위한 것으로, DeclarativeBase 클래스를 상속받는 것 이외에는 어떠한 코드도 필요없다.
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
이제 지난 글에서 계속 사용했던 users 테이블을 그대로 옮겨보자.
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
username: Mapped[str] = mapped_column(String(30))
먼저, __tablename__은 데이터베이스의 테이블 이름을 설정한다. 위 예시에서 보듯이 클래스 이름과 달라도 상관없다. 각각의 속성은 Mapped라는 타입을 통해 테이블의 column이라는 것을 선언할 수 있다. mapped_column은 그 외에 primary key, 구체적인 타입, NOT NULL 등의 제약조건을 추가하는 데 사용된다.
Transaction
이제 실제로 쿼리를 날리는 코드를 살펴보자. Session은 connection과 비슷하게 직접 커밋을 할 수도 있고, begin 함수를 통해 트랜잭션을 자동으로 처리할 수도 있다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
user = User(username="user1")
session.add(user)
users = await session.scalars(select(User).select_from(User))
for u in users:
usernames.append(u.username)
return {"usernames": usernames}
먼저, 위의 예시는 커밋을 실행하지 않았다. 실제로 실행해보면 여러번 실행해도 마지막에 롤백되어 username이 추가되지 않는 것을 확인할 수 있다. 아래처럼 명시적으로 커밋을 해주면 데이터가 추가된다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
user = User(username="user1")
session.add(user)
await session.commit()
users = await session.scalars(select(User).select_from(User))
for u in users:
usernames.append(u.username)
return {"usernames": usernames}
이제 Connection.begin 함수처럼 Session.begin 함수로 자동으로 커밋되도록 해보자.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
async with session.begin():
user = User(username="user1")
session.add(user)
users = await session.scalars(select(User).select_from(User))
for u in users:
usernames.append(u.username)
return {"usernames": usernames}
위 코드에서는 명시적으로 커밋을 해주지 않았지만, async with 구문이 끝날 때 자동으로 커밋을 해준다. 만약 중간에 exception이 발생한다면 롤백을 하고, 세션을 정리해 주는 역할도 대신해준다.
insert
이제 SQLAlchemy ORM을 이용해 데이터를 조작하는 방법을 알아보자. 먼저, 위 예시에서 살펴본 것처럼 새로운 데이터를 삽입하는 방법이다. SQLAlchemy Core에서는 insert를 이용해 SQL 쿼리와 비슷한 형식을 사용했지만, ORM에서는 Session에 새로운 객체를 추가해주면 된다. Session은 트랜잭션 내에서 객체들의 변화를 감지해서 변경된 데이터를 데이터베이스에 반영한다.
Session.add_all을 사용하면 여러개의 객체를 한 번에 추가할 수도 있다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
async with session.begin():
session.add_all([User(username="user1"), User(username="user2"), User(username="user3")])
users = await session.scalars(select(User).select_from(User))
for u in users:
usernames.append(u.username)
return {"usernames": usernames}
select
위의 예시들에서 계속 조회하는 코드가 나왔지만, SQLAlchemy Core의 Connection과는 조금 다르다. Connection.execute를 사용해 조회한 것과 달리 Session.scalars를 사용하고 있다. 두 함수의 차이점은 Connections.execute는 결과를 row로 반환하는 반면, Session.scarars는 User클래스의 객체로 반환한다는 것이다. 이것이 바로 ORM의 주요한 기능 중 하나이다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
async with session.begin():
users = await session.scalars(select(User))
for u in users:
usernames.append(u.username)
return {"usernames": usernames}
물론, session도 execute를 사용해서 조회할 수 있지만, 결과를 반환하는 형식이 조금 다르다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
async with session.begin():
result = await session.execute(select(User))
for row in result:
usernames.append(row[0].username)
return {"usernames": usernames}
마지막으로, 단일 객체를 조회할 때 사용할 수 있는 scalar도 있다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
async with session.begin():
user = await session.scalar(select(User).where(User.id == 1))
usernames.append(user.username)
return {"usernames": usernames}
update
데이터를 수정하는 방법은 SQLAlchemy Core와는 다르다. 데이터베이스에 저장된 데이터를 조회해 Python 객체로 만든 후, 객체를 변경하기만 하면 Session이 변경을 감지해 자동으로 쿼리를 생성한다. 따라서 조회하는 코드에 한 줄만 추가하면 된다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
async with session.begin():
user = await session.scalar(select(User).where(User.id == 1))
user.username = "newusername"
usernames.append(user.username)
return {"usernames": usernames}
그러나 이 코드에는 문제가 있다. user 객체의 username을 직접 변경했기 때문에 실제로 데이터베이스의 값이 변경되지 않아도 변경된 값을 반환할 것이다. 따라서 데이터베이스의 값을 변경한 후, 데이터베이스의 상태와 Python 객체의 상태를 맞추는 과정이 필요하다. 이것을 위해 username을 변경한 후 flush를 통해 변경사항을 데이터베이스에 반영하고, user 객체를 refresh 해서 상태를 맞춰주면 된다.
@app.get("/")
async def read_root():
usernames = []
async with async_session() as session:
async with session.begin():
user = await session.scalar(select(User).where(User.id == 1))
user.username = "newusername"
await session.flush()
await session.refresh(user)
usernames.append(user.username)
return {"usernames": usernames}
delete
마지막으로, 데이터를 삭제하는 것 역시 SQLAlchemy ORM의 Session을 통해서 가능하다. Session.delete를 이용해 데이터를 삭제한 후, 같은 데이터를 다시 조회하면 null이 나오는 것을 확인할 수 있다.
@app.get("/")
async def read_root():
async with async_session() as session:
async with session.begin():
user = await session.scalar(select(User).where(User.id == 1))
await session.delete(user)
await session.flush()
user = await session.scalar(select(User).where(User.id == 1))
return {"user": user}
relationship
만약 다른 테이블간의 관계를 정의하고 싶다면 relationship을 사용할 수 있다. 예를 들어, 한 User가 한 개의 Team에만 속할 수 있고, Team은 여러 User를 가질 수 있다면 아래와 같이 선언할 수 있다.
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
username: Mapped[str] = mapped_column(String(30))
team_id: Mapped[int] = mapped_column(ForeignKey("team.id"))
team: Mapped["Team"] = relationship(back_populates="members")
class Team(Base):
__tablename__ = "team"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
teamname: Mapped[str] = mapped_column(String(30))
members: Mapped[List["User"]] = relationship(back_populates="team")
team_id 속성은 mapped_column에 ForeignKey 제약조건을 추가해 테이블간의 관계를 맺어줬다. relationship은 해당 속성이 column이 아니라 다른 객체를 참조한다는 것을 나타내며, back_populates는 객체에서 서로를 참조할 때 사용할 수 있는 이름이다. 예를 들어, user.team을 통해 team 객체에 접근할 수 있다.
insert
먼저, 위와 같은 관계가 있으면 user와 table에 아래와 같이 데이터를 삽입할 수 있다. 먼저 team 객체를 삽입한 후, user 객체에 team_id를 넣어주면 된다.
@app.get("/")
async def read_root():
teamnames = []
usernames = []
async with async_session() as session:
async with session.begin():
team = Team(teamname="team1")
session.add(team)
await session.flush()
await session.refresh(team)
session.add_all([
User(username="user1", team_id=team.id),
User(username="user2", team_id=team.id),
User(username="user3", team_id=team.id)
])
users = await session.execute(select(User))
teams = await session.execute(select(Team))
for row in users:
usernames.append(row[0].username)
for row in teams:
teamnames.append(row[0].teamname)
return {"usernames": usernames, "teamnames": teamnames}
select
이제 User와 Team을 한 번에 조회하는 방법을 알아보자. 이렇게 relationship을 loading하는 전략은 여러가지가 있는데, 대표적으로 joined loading과 select in loading을 알아본다.
먼저, 그냥 user를 조회한 다음 user.team.teamname에 접근하면 어떻게 될까? 기본적으로 User 클래스를 선언할 때, 아무 설정도 하지 않으면 Team을 lazy loading으로 가져오게 된다.
@app.get("/")
async def read_root():
userinfo = []
async with async_session() as session:
async with session.begin():
users = await session.scalars(select(User))
for user in users:
userinfo.append({user.username: user.team.teamname})
return userinfo
그러나 실행해보면 아래와 같은 에러가 발생한다.
sqlalchemy.exc.StatementError: (sqlalchemy.exc.MissingGreenlet) greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place?
IO가 예상하지 못한 곳에서 일어났다는 뜻이다. 이것은 Team 테이블 조회가 일어날 때 await가 되지 않았기 때문인 것 같다. iterable로 만들어서 await를 붙이는 방법으로 피할 수 있지만, lazy loading 자체를 권장하지 않는 것 같다.
다음으로, joined loading은 말 그대로 데이터베이스에서 join을 한 다음 가져오는 것이다.
@app.get("/")
async def read_root():
userinfo = []
async with async_session() as session:
async with session.begin():
users = await session.scalars(select(User).options(joinedload(User.team)))
for user in users:
userinfo.append({user.username: user.team.teamname})
return userinfo
이렇게 데이터를 가져오는 경우, 딱 한 번의 쿼리만 사용할 수 있다. 그러나 Team 데이터가 필요하지 않은 경우 불필요한 join과 네트워크 사용이 일어나게 될 것이다. 실제로 실행되는 SQL을 확인해보면 아래와 같다.
SELECT users.id, users.username, users.team_id, team_1.id AS id_1, team_1.teamname
FROM users LEFT OUTER JOIN team AS team_1 ON team_1.id = users.team_id
마지막으로, select in loading은 먼저 User 데이터를 조회한 뒤, 필요한 Team의 id만 사용해 조회하는 방법이다.
@app.get("/")
async def read_root():
userinfo = []
async with async_session() as session:
async with session.begin():
users = await session.scalars(select(User).options(selectinload(User.team)))
for user in users:
userinfo.append({user.username: user.team.teamname})
return userinfo
위 코드를 실행하면 아래와 같이 2개의 쿼리가 실행된다.
SELECT users.id, users.username, users.team_id
FROM users;
SELECT team.id AS team_id, team.teamname AS team_teamname
FROM team
WHERE team.id IN (1, 2, 3);
select in loading은 딱 2번의 SQL 쿼리를 사용하고, 필요한 id만 사용하기 때문에 조회 속도가 빠르다.
3줄 요약
- SQLAlchemy ORM은 데이터베이스 테이블과 Python 클래스를 매핑해준다
- 클래스를 선언할 때 데이터베이스의 metadata 정보를 추가해준다
- Session을 이용해 삽입, 조회, 수정, 삭제를 할 수 있다
'Python > FastAPI' 카테고리의 다른 글
[FastAPI] FastAPI 튜토리얼 (7) - Exception (0) | 2023.08.05 |
---|---|
[FastAPI] FastAPI 튜토리얼 (6) - 로그인 (0) | 2023.08.02 |
[FastAPI] FastAPI 튜토리얼 (4) - SQLAlchemy Core (0) | 2023.07.22 |
[FastAPI] FastAPI 튜토리얼 (3) - Dependency (0) | 2023.07.20 |
[FastAPI] FastAPI 튜토리얼 (2) - 매개변수 (0) | 2023.07.14 |