로그인
일부 무료 어플리케이션을 제외하고 대부분의 어플리케이션은 요청을 보낸 사용자가 누구인지 알아야 하는 상황이 있다. 이전에 썼던 글을 조회한다거나, 글을 쓴 사람이 누구인지, 또 자신의 닉네임을 확인하고 변경하는 등의 상황에서 만약 어플리케이션이 해당 유저가 누구인지 확인할 수 없다면 다른 사람의 정보가 노출될 것이다. 따라서 로그인을 통해 해당 사용자가 누구인지, 또 특정한 요청을 보낼 수 있는 자격이 있는지를 확인할 필요가 있다. 이렇게 사용자가 누구인지를 확인하는 것을 인증(Authentication), 자격이 있는지 확인하는 것을 인가(Authorization)라고 한다.
FastAPI에서는 이러한 인증, 인가 프로세스를 쉽게 구현할 수 있게 여러가지 도구를 제공한다. 이번 글에서는 FastAPI를 이용해 로그인을 구현하는 방법을 알아본다. 공식 튜토리얼에 있는 OAuth2 Password flow를 이용하는 방법과 추가적으로 Google ID로 로그인을 하는 Authorization Code flow를 사용하는 방법도 살펴보도록 하겠다.
OAuth2
FastAPI에서 로그인을 구현하기 전에, OAuth2 로그인 과정이 어떻게 진행되는지 간단하게 알아볼 필요가 있다. OAuth2는 Open Authorization 2.0의 줄임말로, Google, 네이버, 카카오 등의 서드파티 계정을 이용해서 간단하게 로그인을 할 수 있도록 해주는 프로토콜이다. 요즘 많은 서비스들이 OAuth2를 지원하기 때문에 Google, 네이버, 카카오 등의 계정만 있으면 각 서비스마다 따로 가입하고 ID를 기억해야하는 불편함이 없어졌다.
이렇게 실제 이용하려는 서비스와 인증, 인가를 담당하는 서버가 따로있다는 것이 OAuth2의 가장 큰 특징이라고 할 수 있다. 그러나 이러한 특징 때문에 인증과정에 참여하는 구성 요소도 많고, 과정도 복잡하다. 먼저, OAuth2의 구성 요소를 알아보자.
OAuth2 구성 요소
- 클라이언트(Client) - 사용자의 데이터에 접근하는 어플리케이션
- 리소스 소유자(Resource Owner) - 데이터를 소유하고 있는 사용자
- 리소스 서버(Resource Server) - 데이터를 관리하고 있는 서버
- 인가 서버(Authorization Server) - 사용자의 인증과 권한 검증을 담당하고, 액세스 토큰을 발급하는 서버
나름 쉽게 풀어쓴 것이지만, 처음 접하는 사람들은 여전히 이해하기 어려울 것이다. 이해하기 쉽게 우리가 만들고 싶은 서비스에 대응시키면, 로그인을 하려는 사용자가 리소스 소유자, 우리가 FastAPI를 이용해 만드는 어플리케이션이 클라이언트, Google, 네이버, 카카오 같은 서드파티가 제공하는 서버가 리소스 서버, 인가 서버라고 보면 된다.
그렇다면 리소스 서버와 인가 서버가 따로 존재하는 이유는 무엇일까? 단순히 로그인을 하기 위해서 OAuth2를 사용한다면 인가서버만 있어도 충분하다. 그러나 OAuth2를 이용하면 사용자가 가지고 있는 리소스에 접근할 수 있는 권한을 위임할 수 있다. 즉, 클라이언트 서버는 자신의 서비스를 실행하기 위해서 리소스 서버에서 제공하는 특정한 리소스에 접근할 수 있는 권한을 요청할 수 있고, 사용자가 로그인을 하면서 권한을 위임하는 데 동의하면 클라이언트 서버에서 Google Drive나 Google 유저 정보 등의 리소스에 접근할 수 있게 된다.
Password Flow
다시 우리의 서비스로 돌아와서, FastAPI를 이용해 로그인을 하는 예제를 만들어보자. FastAPI 튜토리얼에서는 OAuth2의 여러가지 인증 방법 중 Password Flow를 사용한다. Password Flow는 사용자가 클라이언트 서버에 ID와 password를 제공하여 직접 권한을 받을 수 있는 방법이다. 사용자가 ID와 password를 입력해야 하기 때문에 ID와 password가 유출되면 Google 계정을 해킹당할 수 있다. 따라서 일반적으로 Password Flow는 보안에 취약하다는 문제점이 있다.
FastAPI 튜토리얼에서는 로그인 기능 이외에 다른 리소스를 사용하는 것이 아니기 때문에 Google과 같은 서드파티를 사용하는 것이 아니라, FastAPI 서버 자체가 인가 서버의 역할도 수행하도록 변형시켜 직접 ID와 password를 관리하도록 했다. 또한, JWT 토큰을 사용해 변조가 불가능하고, 토큰 만료시간을 설정해 안전하게 사용할 수 있도록 만들었다.
실습
1단계 - FastAPI Security 적용
이제 코드를 통해 FastAPI의 인증 과정을 살펴보자. FastAPI에서 제공하는 인증 관련 코드는 Security 모듈에 존재한다. 우리는 Password Flow를 사용하기 때문에 OAuth2PasswordBearer 클래스를 사용할 것이다. 일단, 단순히 Security를 적용해서 username과 password를 입력받고, API에 인증을 적용시키는 것만 만들어서 전체적인 흐름을 이해해보자.
먼저, OAuth2PasswordRequestForm을 사용하기 위해서는 python-multipart 패키지 설치가 필요하다.
pip install python-multipart
from typing import Annotated
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
return {"access_token": form_data.username, "token_type": "bearer"}
@app.get("/token/detail")
async def my_token(token: Annotated[str, Depends(oauth2_scheme)]):
return {"token": token}
먼저, OAuth2PasswordBearer 인스턴스를 생성해주었다. 이 때, tokenUrl은 로그인이 되어있지 않은 경우, 로그인을 해서 access token을 받을 수 있는 URL을 의미한다. 위 예시에서는 /token이라는 URL에 POST 요청을 보내 토큰을 받을 수 있도록 만들었다.
/token 요청을 처리하는 login 함수에서는 OAuth2PasswordRequestForm을 파라미터로 받는데, 이것은 form data 형태로 username과 password를 입력받을 수 있다. 그 외에 권한을 설정하는 scope와 client_id, client_secret 등을 함께 입력받을 수 있다. 아래에서 점점 추가해 나갈 것이지만, 이렇게 입력받은 데이터를 이용해 ID와 password를 검증하고, 확인이 완료되면 access token을 만들어 반환한다. 반환값은 access_token과 token_type을 필수로 반환해야 하는데, 여기서는 Bearer 토큰을 사용하고 access token은 임시로 username을 반환하도록 만들었다.
이제 이렇게 얻은 토큰을 이용해 /token/detail에 GET 요청을 보낼 수 있다. 아까 만든 oauth2_scheme로 Dependency Injection을 받으면 HTTP 요청의 Authorization 헤더에서 Bearer 토큰을 바로 꺼낼 수 있다. 만약 이렇게 oauth2_scheme을 DI받는 함수에 Authorization 헤더가 없거나, 적절하지 않은 토큰이 있는 경우, 에러를 발생시킬 수 있다.
지금까지의 코드를 실행해보고 싶다면, http://127.0.0.1:8000/docs에서 간단히 실행해볼 수 있다. 자물쇠 모양과 함께 Authorize 버튼이 생긴 것을 확인할 수 있고, 클릭해서 username과 password를 입력해서 로그인할 수 있다. 로그인이 되어있는 상태에서 /token/detail API를 호출하면 호출이 잘 되지만, 로그인이 되어있지 않으면 401 Unauthorized 에러가 발생하는 것을 확인할 수 있다.
2단계 - username, password 체크
이제 입력받은 username과 password를 이용해 DB의 정보와 비교해서 인증하는 과정을 추가해보자. DB에 password를 저장할 때는 원문을 그대로 저장하지 않고, hash값을 저장하는 것이 안전하다. 만약 DB의 데이터가 유출되는 경우에도 원래의 값을 추측할 수 없어야 하기 때문이다. 따라서 이렇게 hash를 계산하기 위해 passlib 패키지를 설치해야 한다.
pip install "passlib[bcrypt]"
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
이제 사용자의 정보를 DB에서 조회해 form data와 비교해야 한다. 여기서는 실제 DB를 사용하지는 않고, mock 데이터를 사용한다. 사용자의 정보를 담을 클래스는 password를 포함하는 UserInDB 클래스와 password가 없는 User 클래스를 사용한다. 만약 User 데이터를 반환하는 경우에는 User 클래스를 반환하고, password가 필요한 경우에는 UserInDB를 사용할 수 있다.
fake_users_db = {
"user1": {
"username": "user1",
"email": "user1@gmail.com",
"hashed_password": "$2b$12$FjpA6dwtJYrA47x..0B1uOWExmL7vHt7R.f7ZznqjHLxbpjSSH48i",
}
}
class User(BaseModel):
username: str
email: str | None = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
이제, login 함수에 authenticate_user 함수를 적용해보자. 만약 입력한 username이 DB에 없거나, password가 다르면 user가 조회되지 않을 것이므로 401 에러를 발생시키면 된다.
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return {"access_token": form_data.username, "token_type": "bearer"}
다시 /docs를 열어서 테스트해보면 username을 user1, password를 password라고 써야만 로그인을 할 수 있는 것을 확인할 수 있다.
3단계 - JWT 토큰
이제 access token에 사용자의 정보를 담아서 로그인 상태를 유지할 수 있도록 변경시켜보자. 먼저, JWT를 사용하기 위해서 secret key를 생성해야한다. 아래와 같이 32바이트 데이터를 랜덤하게 생성해 16진수로 출력할 수 있다.
openssl rand -hex 32
생성된 secret key를 이용해서 JWT 토큰을 만드는 함수를 추가한다. JWT 토큰을 만드는 것은 python-jose 패키지를 이용한다.
pip install "python-jose[cryptography]"
from jose import jwt
from datetime import timedelta, datetime
SECRET_KEY = "025bd359b9af4af6ab459f049d8f34226948b680b6cdd4657b3050b4d9b2888e"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
이제 login 함수에서 JWT 토큰을 생성해 반환하도록 적용시켜보자.
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
이렇게 반환된 토큰을 이용해 요청을 보내면 JWT 토큰을 디코딩해서 사용자의 정보를 가져올 수 있다. 이번에는 JWT 토큰을 받아 사용자 정보를 조회하는 로직을 추가해보자.
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
user = get_user(fake_users_db, username=username)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return user
마지막으로, 로그인된 사용자의 정보가 제대로 조회되고 있는지 API를 만들어서 확인해보자.
@app.get("/users/me", response_model=User)
async def my_info(current_user: Annotated[User, Depends(get_current_user)]):
return current_user
4단계 - scope
마지막 단계는 scope를 이용해 권한을 관리하는 것이다. 먼저, OAuth2PasswordBearer에 scope를 정의해준다.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token",
scopes={"me": "get my info", "items": "read items"})
me는 /users/me에서 자신의 정보에 접근할 수 있는 권한이고, items는 자신의 상품을 조회할 수 있는 권한을 설정했다. 이제 JWT 토큰을 만들 때 scope를 추가하도록 login 함수를 변경해보자.
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
이제 /users/me 요청에 me scope를 적용해보자. Depends에 scope를 추가한 Security를 사용해서 scope를 적용할 수 있다.
@app.get("/users/me", response_model=User)
async def my_info(current_user: Annotated[User, Security(get_current_user, scopes=["me"])]):
return current_user
get_current_user 함수에서는 scopes 파라미터로 받은 me를 SecurityScopes로 받을 수 있다. SecurityScopes 안의 scope를 이용해 권한을 체크하고, JWT 토큰에 해당 scope가 없는 경우 에러를 발생시키면 된다.
async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
scopes = payload.get("sub", [])
for scope in security_scopes.scopes:
if scope not in scopes:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
user = get_user(fake_users_db, username=username)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return user
전체 코드
from datetime import timedelta, datetime
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, SecurityScopes
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
from starlette import status
SECRET_KEY = "025bd359b9af4af6ab459f049d8f34226948b680b6cdd4657b3050b4d9b2888e"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", scopes={"me": "get my info", "items": "read items"})
fake_users_db = {
"user1": {
"username": "user1",
"email": "user1@gmail.com",
"hashed_password": "$2b$12$TU1.aTptRYcaVhj6DSsLzeeGaG2vWR6r.B5be8RNoEXoYNG7rEiAy",
}
}
class User(BaseModel):
username: str
email: str | None = None
class UserInDB(User):
hashed_password: str
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
scopes = payload.get("sub", [])
for scope in security_scopes.scopes:
if scope not in scopes:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
user = get_user(fake_users_db, username=username)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return user
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/token/detail")
async def my_token(token: Annotated[str, Depends(oauth2_scheme)]):
return {"token": token}
@app.get("/users/me", response_model=User)
async def my_info(current_user: Annotated[User, Security(get_current_user, scopes=["me"])]):
return current_user
Google ID 로그인
지금까지는 OAuth2를 약간 변형하여 직접 ID와 password를 관리하는 방법을 살펴보았다. FastAPI는 서드파티를 통해 로그인을 할 수 있도록 다양한 OAuth2 flow를 제공한다. 이번에는 Authorization Code Flow를 사용해 Google ID로 로그인하는 방법을 알아보자.
Authorization Code Flow
Authorization Code Flow는 Password Flow와 다르게 사용자가 클라이언트 서버에 ID와 password를 제공할 필요가 없고, 직접 서드파티에 로그인을 하기만 하면 된다. 그러면 클라이언트에 권한을 부여하는 Authorization Code가 전달되고, 이것을 이용해 사용자의 데이터에 접근할 수 있기 때문이다. 따라서 Password Flow에 비해 안전하기 때문에 실제로 많이 사용되는 방법이다.
실습
Authorization Code Flow를 사용할 때는 OAuth2PasswordBearer 대신 OAuth2AuthorizationCodeBearer를 사용하면 된다. 여기서는 tokenUrl과 함께 authorizationUrl을 추가해줘야 한다. 이것은 Google이나 네이버, 카카오 등의 인증 서버에 따라 달라진다. Google을 사용한다면 아래와 같이 사용할 수 있다.
from typing import Annotated
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2AuthorizationCodeBearer
app = FastAPI()
oauth2_scheme = OAuth2AuthorizationCodeBearer(
authorizationUrl="https://accounts.google.com/o/oauth2/v2/auth",
tokenUrl="https://oauth2.googleapis.com/token",
scopes={"email": "email", "profile": "profile", "openid": "openid"}
)
@app.get('/test')
async def test(token: Annotated[str, Depends(oauth2_scheme)]):
return {"token": token}
/docs에서 Authorize를 누르면 client_id와 client_secret을 입력하는 칸이 생긴 것을 확인할 수 있다.
이 client_id와 client_secret은 Google Cloud에서 어플리케이션을 등록해야만 사용할 수 있다. 먼저 Google Cloud에서 프로젝트를 생성하고, API의 사용자 인증 정보에서 OAuth 클라이언트 ID를 생성하면 된다. 이제 client_id와 client_secret를 가져와 로그인을 하면 아래와 같은 화면이 나온다.
이것은 Authorization Code Flow에서 Redirect를 하는 과정에서 발생한 문제다. 오류 세부정보를 눌러보면 redirect uri를 등록하라는 안내문구가 나온다. 다시 Google Cloud의 API 페이지로 가서 '승인된 리디렉션 URI'에 /docs/oauth2-redirect를 추가해주면 정상적으로 로그인이 되는 것을 확인할 수 있다.
3줄 요약
- OAuth2에는 다양한 Flow가 존재하고, Password Flow를 변형시켜 직접 ID와 password를 관리할 수 있다
- Authorization Code Flow는 비교적 안전하고 많이 사용되는 방법이다
- FastAPI는 로그인과 권한을 체크할 수 있는 다양한 도구를 제공한다
'Python > FastAPI' 카테고리의 다른 글
[FastAPI] FastAPI 튜토리얼 (8) - Middleware (0) | 2023.08.12 |
---|---|
[FastAPI] FastAPI 튜토리얼 (7) - Exception (0) | 2023.08.05 |
[FastAPI] FastAPI 튜토리얼 (5) - SQLAlchemy ORM (0) | 2023.07.30 |
[FastAPI] FastAPI 튜토리얼 (4) - SQLAlchemy Core (0) | 2023.07.22 |
[FastAPI] FastAPI 튜토리얼 (3) - Dependency (0) | 2023.07.20 |