FastAPI Background Tasks and Job Queues: Beyond the Basics
Your API endpoint sends an email, generates a PDF, and processes an image — all before returning a response. The user waits 12 seconds. Most of that work does not need to happen before the response. Move it to background tasks.
FastAPI Built-in BackgroundTasks
For simple, fire-and-forget work:
from fastapi import BackgroundTasks
async def send_welcome_email(email: str, name: str):
"""Runs after the response is sent."""
await email_service.send(
to=email,
subject="Welcome",
body=f"Hello {name}, welcome aboard."
)
@router.post("/users")
async def create_user(
user: UserCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
new_user = crud.create_user(db, user)
# This runs AFTER the response is returned
background_tasks.add_task(send_welcome_email, user.email, user.name)
return {"id": new_user.id, "message": "User created"}
The response returns immediately. The email sends in the background using the same process.
Limitations of BackgroundTasks
BackgroundTasks runs in the same process as your API. This means:
- If the process crashes, the task is lost
- CPU-intensive tasks block other requests
- No retry logic
- No task monitoring or tracking
- Does not survive server restarts
For anything critical, use a proper job queue.
Celery + Redis
Celery is the standard Python job queue. Tasks run in separate worker processes:
# tasks.py
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task(bind=True, max_retries=3)
def process_image(self, image_id: str):
try:
image = download_image(image_id)
thumbnail = resize(image, 300, 300)
upload(thumbnail, f"thumbs/{image_id}")
except Exception as exc:
self.retry(countdown=60, exc=exc)
@app.task
def generate_report(user_id: str, report_type: str):
data = fetch_report_data(user_id, report_type)
pdf = render_pdf(data)
store_report(user_id, pdf)
notify_user(user_id, "Your report is ready")
Trigger from FastAPI:
from tasks import process_image, generate_report
@router.post("/images/{image_id}/process")
async def trigger_processing(image_id: str):
task = process_image.delay(image_id)
return {"task_id": task.id, "status": "processing"}
@router.get("/tasks/{task_id}")
async def check_task(task_id: str):
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
Key features:
- Retries: Failed tasks retry up to 3 times with a 60-second delay
- Separate processes: Worker processes do not block your API
- Persistence: Tasks survive API restarts (they are in Redis)
- Monitoring: Track task status via task IDs
Task Priorities
# High priority — user-facing
send_notification.apply_async(priority=0)
# Normal priority — business logic
generate_report.apply_async(priority=5)
# Low priority — maintenance
cleanup_old_files.apply_async(priority=9)
Periodic Tasks with Celery Beat
from celery.schedules import crontab
app.conf.beat_schedule = {
"cleanup-expired-tokens": {
"task": "tasks.cleanup_expired_tokens",
"schedule": crontab(hour=3, minute=0), # Daily at 3 AM
},
"generate-daily-stats": {
"task": "tasks.generate_daily_stats",
"schedule": crontab(hour=0, minute=30), # Daily at 12:30 AM
},
"health-check": {
"task": "tasks.health_check",
"schedule": 300.0, # Every 5 minutes
},
}
When to Use What
| Scenario | Solution |
|---|---|
| Send email after signup | BackgroundTasks |
| Process uploaded image | Celery |
| Generate PDF report | Celery |
| Daily database cleanup | Celery Beat |
| Real-time notification | BackgroundTasks |
| Long-running ML inference | Celery with dedicated queue |
Takeaways
BackgroundTasks is perfect for lightweight, non-critical work. Celery handles everything else — retries, monitoring, priorities, scheduling, and crash recovery. Start with BackgroundTasks and graduate to Celery when you need reliability guarantees.