Programming/Project Log

[가계부 만들기] DB 비동기 처리

minarae7 2023. 4. 23. 22:29
728x90
반응형

FastAPI + SQLAlchemy

프로그램을 구현하다 보면 좀 더 효율적이고 빠르게 동작하는 프로그램으로 계속 수정해나가야 한다.

이렇게 프로그램의 퍼포먼스를 증가시켜서 더 사용자들에게 프로그램이 진화하고 있다는 것을 보여주는 것이 중요하다.

프로그램을 개발하다 보면 보편적으로 가장 많은 리소스를 차지하는 것이 결국에는 IO 처리이다.

파일을 읽고 쓰고, 네트워크로 데이터를 주고 받고 하는 일련의 과정들이 결국에는 IO인 셈이다.

그럼 이렇게 리소스를 많이 잡아먹고 대기시간을 길게하는 IO 작업을 한없이 기다리게 할 수 없을 것이다.

그래서 일반적으로는 이런 처리를 비동기로 처리하도록 개발한다.

동기와 비동기에 대한 차이를 여기서는 자세히 기술하지 않을 것이다.

간단하게 설명하면 동기 방식으로 프로그램을 구현하면 네트워크에 질의를 하였을 때 응답이 올 때까지 대기하게 된다.

비동기 방식으로 구현하면 네트워크에 질의를 하고 응답이 올 때까지 기다리지 않고 다른 일을 먼저 처리하고 응답이 왔을 때 응답을 처리하도록 한다.

자세한 내용은 아래 블로그 포스팅을 참조하면 된다.

728x90
 

동기, 비동기 처리

데이터를 처리하는 방식인 동기, 비동기 처리에 대해 많은 글이 있지만 정확하게 와닿지가 않았다. 최대한 내가 이해한 방식대로 서술해 보려고 한다. 동기 (Synchronous)는 요청과 동시에 일어난다

velog.io

그럼 가계부 프로그램을 만들다가 갑자기 동기/비동기 얘기는 왜 하느냐.

지금까지 구현한 프로그램은 디비를 조회할 때 동기방식으로 처리된다. 즉, mysql에 질의를 던졌을 때 응답이 올 때까지 대기하게 되는 것이다.

이런 경우 규모가 작은 프로그램 경우 성능에 큰 차이가 없지만 추후 프로그램 규모가 커지고 사용자가 많아지면 성능이 떨어지게 된다.

그 때 프로그램을 수정하려면 많은 시간과 노력이 필요하기 때문에 처음 개발할 때 비동기로 개발을 시작하는 것이 유리하다.

여기서도 프로그램이 더 커지기 전에 동기 방식의 DB 질의를 비동기 방식으로 변경하고자 한다.

먼저 접속을 시도하는 models/connection.py 파일부터 수정하도록 한다.

 
from sqlalchemy.orm import sessionmaker
import json
import os

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.exc import SQLAlchemyError

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

SERECT_FILE = os.path.join(BASE_DIR, 'secrets.json')
serects = json.loads(open(SERECT_FILE).read())
DB = serects["DB"]

DB_URL = f"mysql+aiomysql://{DB['user']}:{DB['password']}@{DB['host']}/{DB['database']}"

engine = create_async_engine(DB_URL, echo=True, pool_pre_ping=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db() -> AsyncSession:
    async with async_session() as session:
        try:
            yield session
        except SQLAlchemyError as e:
            await session.rollback()
            raise e
        finally:
            await session.close()

sqlalchemy에서는 이미 비동기로 처리하기 위한 패키지를 포함하고 있다. 이와 관련된 패캐지들을 먼저 로드하도록 한다.

mysql에 접속하는 string도 mysql+pymysql에서 mysql+aiomysql로 변경하였다.

이전에는 engine을 생성할 때 create_engine을 사용하였지만 이제 create_async_engine함수를 사용한다.

그 밖에 get_db 함수 앞에 async를 붙여주고 반환되는 값의 타입도 Session에서 AsyncSession으로 변경되었다.

이제 다음으로 database/models.py 파일을 열어서 가장 마지막 줄에 table을 생성해주던 코드를 아래와 같이 주석처리한다.

#with engine.connect() as conn:
#    Base.metadata.create_all(conn)

 이 부분을 주석 처리하는 이유는 DB를 비동기로 처리하기로 하였기 때문에 함수를 호출할 때는 async를 붙여주어야 하는데 이건 함수에서만 사용할 수 있기 때문에 여기서 with 앞에는 async를 붙일 수 없기 때문이다.

그럼 이 역할을 하는 코드가 어딘가에 추가되어야 한다.

이 프로그램에서는 main.py을 열어서 다음 코드를 추가하는 것으로 해결하였다.

반응형
from .database.connection import engine

...
@app.on_event("startup")
async def startup():
    async with engine.connect() as conn:
        await conn.run_sync(models.Base.metadata.create_all)

@app.on_event("shutdown")
async def shutdown_event():
    await engine.dispose()

위의 코드에서 fastapi 시작될 때와 종료될 때 이벤트를 설정하였다. 프로세스가 시작될 때 테이블 리스트를 검사하고 없는 테이블을 생성하도록 하였고, 종료될 때 접속을 정리하도록 하였다.'

on_event로 프로그램이 시작할 때 처리할 코드와 종료될 때 처리할 코드를 정의한다. 현재 fastapi 공식 문서에서는 두 함수를 앞으로 사용하지 말라고 되어있지만 아직까지 사용하는데는 무리가 없다.

 

Lifespan Events - FastAPI

Lifespan Events Warning The current page still doesn't have a translation for this language. But you can help translating it: Contributing. You can define logic (code) that should be executed before the application starts up. This means that this code will

fastapi.tiangolo.com

이제 마지막으로 실제로 쿼리를 질의하여서 결과를 얻어야 하는 곳에서의 코드를 수정하여야 한다.

membes_service.py 파일을 수정할 것이다.

이 곳에서는 수정할 내용이 많기는 하지만 내용은 간단한다. 이 파일을 수정하는 패턴은 다음과 같다.

함수 파라미터로 정의한 db가 이전에는 Session 형이었지만 이제 AsyncSession으로 바뀌었다.

따라서 모든 함수의 db 파라미터는 Session 형에서 AsyncSession형으로 변경한다.

async def create_member(db: AsyncSession, member: schemas.MemberCreate):
    ...

처음 함수를 선언할 때 해당 함수들은 비동기 함수가 아니였음에도 모두 async를 붙여서 선언하였기 때문에 이 부분에 대해서는 수정할 것이 없다.

이제 db에 질의를 던지는 부분에 모두 await를 붙여주자.

원래는 이렇게 처리하지 않기 위해서 비동기로 처리하는 것인데, 해당 코드에서는 당장 비동기로 처리할 만큼 독립적인 코드가 없다.

따라서 DB에 질의하는 모든 코드 앞에 await를 붙여주어도 무방하다.

db.execute, db.commit, db.refresh 함수 앞에 await를 붙이면 된다.

반응형
async def create_member(db: AsyncSession, member: schemas.MemberCreate):
    # 아이디가 중복되는 계정이 있는지 확인
    stmt = select(models.Members.member_id).filter(models.Members.member_id == member.member_id)
    result = await db.execute(stmt)

    list = result.fetchall()
    if len(list) > 0:
        raise Exception("아이디가 이미 사용 중입니다")

    # 패스워드 해싱 처리
    password_hash = auth.get_password_hash(member.member_pw)

    # DB 저장
    db_member = models.Members(**member.dict(exclude={"member_pw"}), member_pw=password_hash)
    db.add(db_member)
    await db.commit()
    await db.refresh(db_member)

    return db_member

사용자를 추가하는 코드인데 위에서 설명한 것과 같이 관련 함수를 사용하는 곳에서 모두 await를 붙여서 결과가 올 때까지 기다리도록 하였다.

아래는 변경된 members_service.py 파일 전체 내용이다. 참고하시면 된다.

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import func
from datetime import timedelta
from ..database import models, schemas
from ..libraries import auth

ACCESS_TOKEN_EXPIRE_MINUTES = 60
REFRESH_TOKEN_EXPIRE_HOURS = 24

async def create_member(db: AsyncSession, member: schemas.MemberCreate):
    # 아이디가 중복되는 계정이 있는지 확인
    stmt = select(models.Members.member_id).filter(models.Members.member_id == member.member_id)
    result = await db.execute(stmt)

    list = result.fetchall()
    if len(list) > 0:
        raise Exception("아이디가 이미 사용 중입니다")

    # 패스워드 해싱 처리
    password_hash = auth.get_password_hash(member.member_pw)

    # DB 저장
    db_member = models.Members(**member.dict(exclude={"member_pw"}), member_pw=password_hash)
    db.add(db_member)
    await db.commit()
    await db.refresh(db_member)

    return db_member

# login 처리
async def login_proc(db: AsyncSession, member_id: str, member_pw: str):
    # 해당 아이디가 있는지 찾는다/
    stmt = select(models.Members).filter(models.Members.member_id == member_id, models.Members.is_deleted == 'F')
    result = await db.execute(stmt)

    db_member = result.fetchone()
    if db_member is None:
        raise Exception("해당하는 아이디를 찾을 수 없습니다")

    if auth.verify_password(member_pw, db_member.Members.member_pw) == False:
        raise Exception("패스워드가 일치하지 않습니다.")

    return make_login_response(db_member)


# refresh token 처리
async def member_refresh(db: AsyncSession, refresh_token: schemas.Refresh):
    # refresh token 검사
    try:
        payload = auth.decode_refresh_token(refresh_token.refresh_token)
    except Exception:
        raise Exception

    # 해당 회원이 있는지 검사
    stmt = select(models.Members).filter(models.Members.member_no == payload['member_no'], models.Members.is_deleted == 'F')
    result = await db.execute(stmt)

    db_member = result.fetchone()
    if db_member is None:
        raise Exception("해당하는 아이디를 찾을 수 없습니다")

    return make_login_response(db_member)


def make_login_response(db_member):
    data = {
        "member_no": db_member.Members.member_no,
        "member_id": db_member.Members.member_id,
        "member_name": db_member.Members.member_name,
        "member_email": db_member.Members.member_email
    }
    data["access_token"] = auth.create_access_token(data, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    data["refresh_token"] = auth.create_access_token({
        "member_no": data["member_no"],
        "member_id": data["member_id"]
    }, timedelta(hours=REFRESH_TOKEN_EXPIRE_HOURS))

    return data


async def member_modify(db: AsyncSession, member: schemas.MemberModify, payload: schemas.JWTPayload):
    # 회원이 존재하는 아이디인지 확인
    result = await db.execute(select(models.Members).filter_by(member_no = payload['member_no'], is_deleted = 'F'))
    db_member = result.scalars().first()

    if db_member is None:
        raise Exception("해당하는 회원 정보를 찾을 수 없습니다.")

    member_info =  member.dict()
    member = {k: v for k, v in member_info.items()}
    for key, value in member.items():
        if value is None:
            continue

        if key == 'member_pw':
            setattr(db_member, key, auth.get_password_hash(value))
        else:
            setattr(db_member, key, value)

    await db.commit()
    return db_member


async def member_delete(db: AsyncSession, payload: schemas.JWTPayload):
    # 회원이 존재하는 아이디인지 확인
    db_member = await db.query(models.Members).filter_by(member_no = payload['member_no'], is_deleted = 'F').first()

    if db_member is None:
        raise Exception("해당하는 회원 정보를 찾을 수 없습니다.")

    setattr(db_member, 'is_deleted', 'T')
    setattr(db_member, 'del_dt', func.now())

    await db.commit()
    return db_member

이제 프로그램이 정상 작동하는지만 확인하면 된다.

앞서서 설명했던 것과 같이 docs 페이지에서 각 기능이 잘 작동하는지만 확인해보면 된다. 여기서는 이미 검증했던 기능들이서 생략한다.

728x90
반응형