Celery는 비동기 작업 큐 시스템으로, 주로 백그라운드 작업 처리에 사용된다. 이메일 발송, 이미지 처리, 데이터 분석 등과 같이 웹 애플리케이션에서 시간이 오래 걸리는 작업을 비동기적으로 처리하려면 Celery를 사용하는 것이 좋다. Celery는 메시지 브로커와 함께 작동하는데, 가장 많이 사용하는 메시지 브로커가 바로 Redis이다.
Redis - 비동기 작업 큐 및 캐싱
Redis는 인메모리 데이터베이스로, 비동기 작업 큐로 사용하거나 캐시를 저장하는 데 매우 유용하다. Celery는 기본적으로 Redis를 메시지 브로커로 사용하고, Redis는 작업 큐로서 메시지 전달을 맡는다. 또한, 캐시 시스템으로 활용되기도 한다.
(1) Celery 설치 및 기본 설정
먼저 Celery와 Redis 설치에 대해 이야기해보자.
pip install celery redis
설치가 끝난 후에는, Celery와 Redis를 연결하기 위해 Celery 설정 파일을 만들어 설정해야 한다. celery.py라는 파일을 만들고 Redis를 메시지 브로커로 설정할 수 있다.
예) celery.py 파일
from celery import Celery
# Redis를 메시지 브로커로 사용
app = Celery('tasks', broker='redis://localhost:6379/0')
# 작업을 비동기적으로 처리할 때 사용할 작업 정의
@app.task
def add(x, y):
return x + y
- Celery('tasks', broker='redis://localhost:6379/0'): localhost:6379에서 실행 중인 Redis를 메시지 브로커로 사용
- add(x, y): 두 숫자를 더하는 간단한 비동기 작업
(2) Celery 작업 실행하기
Celery 작업을 비동기적으로 실행하려면, 먼저 Celery 워커를 실행해야 한다. 워커는 백그라운드에서 대기하고 있다가 작업을 받아서 처리한다.
celery -A celery worker --loglevel=info
- -A celery: celery.py에서 Celery 앱을 불러옴
- worker: Celery 워커를 실행
- --loglevel=info: 로그 레벨을 설정
(3) 비동기 작업 호출
이제, Celery로 정의한 작업을 비동기적으로 호출할 수 있다. 예를 들어, add 작업을 비동기적으로 실행하려면 아래와 같은 코드를 실행할 수 있다.
from celery import Celery
from celery.result import AsyncResult
app = Celery('tasks', broker='redis://localhost:6379/0') # 비동기적으로 호출
result = add.apply_async((4, 6)) # 비동기 호출
# 작업 상태와 결과 확인
print(result.status)# 작업 상태 출력
print(result.result) # 결과 출력
- apply_async(): add 작업을 비동기적으로 호출
- result.status: 작업의 상태(예: "PENDING", "SUCCESS" 등) 확인
- result.result: 작업이 끝난 후 결과를 확인
(4) 작업 큐의 상태 확인 및 모니터링
Celery는 작업 큐와 작업 상태를 관리할 수 있다. Flower라는 모니터링 도구를 사용하면 Celery의 작업 상태를 실시간으로 확인할 수 있다.
pip install flower celery -A celery flower
- flower: Celery 워커 상태를 웹 UI로 확인할 수 있게 해줌
Redis를 이용한 캐싱
(1) Redis로 캐시 사용하기
Redis는 빠른 읽기/쓰기를 제공하기 때문에, 데이터를 캐시에 저장하면 데이터베이스 부하를 줄이고, 반복적인 데이터 조회 성능을 개선할 수 있다. FastAPI와 Redis를 사용하여 캐시를 관리하는 방법을 살펴보자.
pip install fastapi redis uvicorn
(2) FastAPI와 Redis를 이용한 캐싱 예시
import redis
from fastapi import FastAPI
app = FastAPI()
cache = redis.Redis(host='localhost', port=6379, db=0)
@app.get("/items/{item_id}")
async def get_item(item_id: int):
cached_item = cache.get(f"item:{item_id}")
if cached_item:
return {"item_id": item_id, "cached": True, "data": cached_item.decode()} # DB에서 아이템을 가져오는 로직 (생략) item = {"item_id": item_id, "name": "Example"} cache.setex(f"item:{item_id}", 60, str(item)) # 60초 동안 캐시 유지
return item
- cache.get(): 캐시에서 데이터를 조회
- cache.setex(): 데이터를 캐시에 저장하고, 만료 시간을 설정 (여기선 60초)
(3) 캐시 만료 및 갱신
데이터가 변경될 때마다 캐시를 갱신하거나 만료시키는 것이 중요하다. 예를 들어, 상품 정보를 업데이트할 때 캐시를 삭제하고 DB에서 새 데이터를 가져오도록 할 수 있다.
# 예시: 상품 정보 업데이트 후 캐시 삭제
cache.delete(f"item:{item_id}")
- delete(): 캐시된 데이터를 삭제하여 최신 정보를 DB에서 다시 조회할 수 있도록 함
Celery와 Redis를 함께 활용한 비동기 작업과 캐싱 결합
(1) Celery를 이용한 비동기 작업에서 캐싱 사용
비동기 작업이 시간이 오래 걸릴 때, 그 결과를 캐시로 저장하여 다음 요청에서 빠르게 처리할 수 있도록 할 수 있다. 예를 들어, 이미지 처리를 비동기적으로 하면서 결과를 캐시하는 방식이다.
from celery import Celery
import redis
# Celery 설정
app = Celery('tasks', broker='redis://localhost:6379/0')
# Redis 설정
cache = redis.Redis(host='localhost', port=6379, db=0)
@app.task
def process_image(image_id: str):
# 이미지 처리 (시간이 오래 걸리는 작업)
image = process_image_from_db(image_id)
# 처리 결과 캐시
cache.setex(f"image:{image_id}", 3600, str(image))
# 1시간 동안 캐시
return image
- process_image: 이미지를 비동기적으로 처리하는 작업
- cache.setex(): 이미지 처리 결과를 캐시로 저장
Celery + Redis를 활용한 백엔드 성능 최적화
Celery와 Redis는 백엔드 성능 최적화에서 핵심적인 역할을 한다. Celery를 통해 비동기 작업을 효율적으로 처리하고, Redis는 작업 큐의 메시지 브로커로 사용되며, 캐시 시스템으로도 활용될 수 있다. 이 두 기술을 결합하면, 서버 부하를 줄이고 백그라운드에서 긴 시간 작업을 비동기적으로 처리하면서도 캐시를 이용하여 응답 속도를 대폭 개선할 수 있다. 이렇게 Celery와 Redis를 적절히 활용하면, 고성능 백엔드 시스템을 구축할 수 있다. 마무리하면서 추가로 Redis 명령어에 대한 링크를 첨부한다.
참고)
'개념정리 > 개발' 카테고리의 다른 글
boto3 (0) | 2025.05.03 |
---|---|
FastAPI와 비동기 처리 (0) | 2025.02.09 |
MSA, Landing Zone (0) | 2025.01.31 |
Bastion Host (1) | 2025.01.27 |