freeleaps-service-hub/apps/content/webapi/providers/cache.py

67 lines
2.0 KiB
Python

import redis.asyncio as redis
from redis.asyncio import Redis
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
# Declare redis_instance globally
redis_instance = None
def register(app):
@app.on_event("startup")
async def init_cache():
redis_instance = redis.Redis(
host="localhost", # Replace with your Redis host
port=6379, # Replace with your Redis port
db=0, # Replace with your Redis database number
decode_responses=True, # Optional: Enable decoded responses
)
# Test the Redis connectionafrom redis.asyncio import Redis
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
def register(app):
@app.on_event("startup")
async def init_cache():
# Connect to Redis running in Docker
redis_instance = Redis(
host="localhost", # If Redis is running on the same machine
port=6379, # Port mapped to Redis container
db=0,
decode_responses=True,
)
# Test Redis connection
try:
await redis_instance.ping()
print("Connected to Redis Docker container!")
except Exception as e:
print(f"Failed to connect to Redis: {e}")
raise
# Initialize FastAPICache
FastAPICache.init(RedisBackend(redis_instance), prefix="fastapi-cache")
@app.on_event("shutdown")
async def shutdown_redis():
await redis_instance.close()
try:
await redis_instance.ping()
print("Redis connection established successfully!")
except Exception as e:
print(f"Failed to connect to Redis: {e}")
raise
# Initialize FastAPICache
FastAPICache.init(RedisBackend(redis_instance), prefix="fastapi-cache")
@app.on_event("shutdown")
async def close_redis():
if redis_instance:
await redis_instance.close()
print("Redis connection closed gracefully.")