Compare commits

..

No commits in common. "b12ab5c5057bf25653f82c1f4d57ec03e7b5c397" and "39321a330f1fa847abfc9648c2a38299a25a8285" have entirely different histories.

12 changed files with 81 additions and 695 deletions

View File

@ -1,46 +1,3 @@
# [1.15.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.14.2...v1.15.0) (2025-10-28)
### Bug Fixes
* change notification service model layer to not using Beanie ([2eb09c8](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/2eb09c834c147411867d32fc8b640cf784444083))
### Features
* make init_deployment api support compute_unit ([ad3b2ea](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/ad3b2ea938d73d210d8d9df7e0cd448537292c33))
## [1.14.2](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.14.1...v1.14.2) (2025-10-24)
### Bug Fixes
* change middleware to avoid Beanie init ([1823a33](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/1823a33d45abdf2076a2eee5d899e50d8c72993f))
* change the authentication service model layer from Beanie to direct MongoDB ([a936f89](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/a936f8942644d14b5af91bd5d2e372e12cbc2043))
* fix the id wrong type issue ([f57cfb2](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/f57cfb2d3accf7ee8817c8fcdc2bd4a1aa236aa1))
* fix the invalid id format issues ([ba63e1b](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/ba63e1b7a8835e9f898d503a01ec837da83e5072))
## [1.14.1](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.14.0...v1.14.1) (2025-10-21)
### Bug Fixes
* change name to more general ones ([2036c4b](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/2036c4b9ae5bd9d1d7b00daead0e047b6d9e4ebb))
* create apis for magicleaps password things avoiding calling code depot ([497e608](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/497e6080a3cb703621ae9b17f3ca2b0e1c4c3672))
* fix the small issue of user_id not found ([6a207c7](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/6a207c7e79abc9400724caa3ee873f1e579deeb0))
# [1.14.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.13.0...v1.14.0) (2025-10-14)
### Features
* **auth:** add extrnal auth introspect api interface ([7a83237](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/7a832371c7790c77d060344a4141c1dc5c435333))
* **env:** add env config ([6bbaaae](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/6bbaaae30f2649e787aeddd62779301447ae537b))
* **format:** modify the log format ([81f2a21](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/81f2a21f6bb8390f851316a05809041f60b3ea4d))
* **middleware:** add middleware for authentication ([80a6beb](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/80a6beb1ed36d94f3024a80045cbf82a1d0a8844))
* **middleware:** modify database to tolerate the tenant cache ([7844906](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/7844906cfe64b1b2f2d8ae29b064213385c6ce98))
* **model:** add usage_log_doc ([5f18212](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/5f18212343f93eebdec57498a56f60a8065b2501))
# [1.13.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.12.1...v1.13.0) (2025-09-30) # [1.13.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.12.1...v1.13.0) (2025-09-30)

View File

@ -1 +1 @@
1.15.0 1.13.0

View File

@ -150,7 +150,8 @@ class PermissionHandler:
query = {} query = {}
if permission_id: if permission_id:
try: try:
query["_id"] = ObjectId(permission_id) # Convert string to ObjectId for MongoDB ObjectId(permission_id) # Validate ObjectId format
query["_id"] = permission_id # Use MongoDB's _id field directly
except Exception: except Exception:
raise RequestValidationError("Invalid permission_id format. Must be a valid ObjectId.") raise RequestValidationError("Invalid permission_id format. Must be a valid ObjectId.")
if permission_key: if permission_key:

View File

@ -144,7 +144,8 @@ class RoleHandler:
query = {} query = {}
if role_id: if role_id:
try: try:
query["_id"] = ObjectId(role_id) # Convert string to ObjectId for MongoDB ObjectId(role_id) # Validate ObjectId format
query["_id"] = role_id # Use MongoDB's _id field directly
except Exception: except Exception:
raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.") raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.")
if role_key: if role_key:

View File

@ -25,7 +25,6 @@ class Deployment(Document):
deployment_git_url: str deployment_git_url: str
deployment_git_sha256: str deployment_git_sha256: str
deployment_reason: str deployment_reason: str
compute_unit: Optional[int] = None # None for old data
deployment_app_url: str = "" # URL to access the deployed application, keep it empty to be filled later deployment_app_url: str = "" # URL to access the deployed application, keep it empty to be filled later
deployed_by: str deployed_by: str
@ -47,7 +46,6 @@ class InitDeploymentRequest(BaseModel):
user_id: str user_id: str
reason: str = "not provided" reason: str = "not provided"
ttl_hours: int = -1 ttl_hours: int = -1
compute_unit: Optional[int] = 0
class CheckDeploymentStatusRequest(BaseModel): class CheckDeploymentStatusRequest(BaseModel):
product_id: str product_id: str
@ -87,5 +85,4 @@ class DevOpsReconcileRequest(BaseModel):
commit_sha256: Optional[str] = None commit_sha256: Optional[str] = None
target_env: Literal["alpha", "prod"] target_env: Literal["alpha", "prod"]
ttl_control: bool = False ttl_control: bool = False
ttl: int = 10800 ttl: int = 10800
compute_unit: Optional[int] = 0

View File

@ -58,7 +58,6 @@ class DeploymentService:
deployed_by = request.user_id, deployed_by = request.user_id,
created_at = datetime.now(), created_at = datetime.now(),
updated_at = datetime.now(), updated_at = datetime.now(),
compute_unit = request.compute_unit,
) )
await self._start_deployment(deployment) await self._start_deployment(deployment)
@ -183,7 +182,6 @@ class DeploymentService:
ttl_control=deployment.deployment_ttl_hours > 0, ttl_control=deployment.deployment_ttl_hours > 0,
ttl=10800 if deployment.deployment_ttl_hours < 0 else deployment.deployment_ttl_hours * 60 * 60, ttl=10800 if deployment.deployment_ttl_hours < 0 else deployment.deployment_ttl_hours * 60 * 60,
commit_sha256=deployment.deployment_git_sha256, commit_sha256=deployment.deployment_git_sha256,
compute_unit=deployment.compute_unit
) )
# send request to reoncile service # send request to reoncile service
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:

View File

@ -41,70 +41,6 @@ class StarRocksMetricsService:
AND product_id = %s AND product_id = %s
ORDER BY date ASC ORDER BY date ASC
""", """,
"dcr": """
SELECT
date,
product_id,
value,
updated_date
FROM dws_dcr
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"mrar": """
SELECT
date,
product_id,
CASE
WHEN monthly_requests = 0 THEN 0.0
ELSE (monthly_accepted_requests * 1.0) / monthly_requests
END AS value,
updated_date
FROM dws_mrar
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"trar": """
SELECT
product_id,
CASE
WHEN total_requests = 0 THEN 0.0
ELSE (total_accepted_requests * 1.0) / total_requests
END AS value,
updated_date
FROM dws_trar
WHERE product_id = %s
""",
"mrqr": """
SELECT
date,
product_id,
CASE
WHEN monthly_requests = 0 THEN 0.0
ELSE (monthly_quoted_requests * 1.0) / monthly_requests
END AS value,
updated_date
FROM dws_mrqr
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"trqr": """
SELECT
product_id,
CASE
WHEN total_requests = 0 THEN 0.0
ELSE (total_quoted_requests * 1.0) / total_requests
END AS value,
updated_date
FROM dws_trqr
WHERE product_id = %s
""",
}, },
"magicleaps": { "magicleaps": {
@ -156,9 +92,9 @@ class StarRocksMetricsService:
self, self,
product_id: str, product_id: str,
metric_name: str, metric_name: str,
step: Optional[str], step: str,
start_date: Optional[Union[str, date]], start_date: Union[str, date],
end_date: Optional[Union[str, date]] end_date: Union[str, date]
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Query metric data for a specific date range. Query metric data for a specific date range.
@ -204,17 +140,6 @@ class StarRocksMetricsService:
await self.module_logger.log_error(error_msg) await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg) raise HTTPException(status_code=404, detail=error_msg)
# Check if metric need time params
# Starting with "t" indicates a query for the total count since the very first record.
# NOTE: This determination logic is subject to future changes.
if (start_date is None) or (end_date is None) or (step is None):
if metric_name.startswith('t'):
return await self._query_metric_by_product_id(product_id, metric_name)
else:
error_msg = f"Metric '{metric_name}' should be queried by start date, end date and step."
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Parse date strings if they are strings # Parse date strings if they are strings
if isinstance(start_date, str): if isinstance(start_date, str):
try: try:
@ -359,7 +284,7 @@ class StarRocksMetricsService:
result_dict[date_str] = { result_dict[date_str] = {
"date": date_str, "date": date_str,
"value": value if value is not None else 0, "value": int(value) if value is not None else 0,
"metric": metric_name, "metric": metric_name,
"labels": labels "labels": labels
} }
@ -448,67 +373,4 @@ class StarRocksMetricsService:
"metric_name": metric_name, "metric_name": metric_name,
"sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(), "sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(),
"description": f"{metric_name} count from StarRocks table dws_{metric_name}" "description": f"{metric_name} count from StarRocks table dws_{metric_name}"
} }
async def _query_metric_by_product_id(self, product_id: str, metric_name: str) -> List[Dict[str, Any]]:
"""
Query metric not suitable for date range (e.g. data related to calculating total records).
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query
Returns:
List of dictionaries with 'product_id' key.
Raises:
Exception: If StarRocks query fails
Example:
result = await service.query_metric_by_time_range(
"freeleaps",
"total_request_quoted_rate",
)
# Returns: [{"date": "freeleaps", "value": 45, "labels": {...}},]
"""
# Get the SQL query for the metric
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
try:
await self.module_logger.log_info(
f"Querying metric '{metric_name}' from product '{product_id}'")
# Execute the query
result = await self.starrocks_client.execute_query(
query=sql_query,
params=(product_id)
)
# Parse the result and format it
for row in result:
# Get the value
value = row.get("value", 0)
if value is None:
value = 0
# Create labels dictionary
labels = {
"product_id": row.get("product_id", product_id),
"metric_type": metric_name,
}
result_dict = []
result_dict.append({
"date": None,
"value": value if value is not None else 0,
"metric": metric_name,
"labels": labels
})
await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}'")
return result_dict
except Exception as e:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise

View File

@ -9,7 +9,7 @@ from backend.services.starrocks_metrics_service import StarRocksMetricsService
class MetricDataPoint(BaseModel): class MetricDataPoint(BaseModel):
"""Single data point in metric time series.""" """Single data point in metric time series."""
date: Optional[str] = Field(None, description="Date in YYYY-MM-DD format") date: str = Field(..., description="Date in YYYY-MM-DD format")
value: Union[int, float] = Field(..., description="Metric value") value: Union[int, float] = Field(..., description="Metric value")
labels: Dict[str, Any] = Field(default_factory=dict, description="Metric labels") labels: Dict[str, Any] = Field(default_factory=dict, description="Metric labels")
@ -19,16 +19,17 @@ class MetricTimeSeriesResponse(BaseModel):
metric_name: str = Field(..., description="Name of the queried metric") metric_name: str = Field(..., description="Name of the queried metric")
data_points: List[MetricDataPoint] = Field(..., description="List of data points") data_points: List[MetricDataPoint] = Field(..., description="List of data points")
total_points: int = Field(..., description="Total number of data points") total_points: int = Field(..., description="Total number of data points")
time_range: Dict[Optional[str], Optional[str]] = Field([None, None], description="Start and end date of the query") time_range: Dict[str, str] = Field(..., description="Start and end date of the query")
class MetricQueryRequest(BaseModel): class MetricQueryRequest(BaseModel):
"""Request model for metric query by time range.""" """Request model for metric query."""
product_id: str = Field(..., description="Product ID to identify which product's data to query") product_id: str = Field(..., description="Product ID to identify which product's data to query")
metric_name: str = Field(..., description="Name of the metric to query") metric_name: str = Field(..., description="Name of the metric to query")
step: Optional[str] = Field(None, description="Aggregation step, e.g., 1d or 1m") step: str = Field(..., description="Aggregation step, e.g., 1d or 1m")
start_date: Optional[str] = Field(None, description="Start date in YYYY-MM-DD HH:MM:SS format") start_date: str = Field(..., description="Start date in YYYY-MM-DD HH:MM:SS format")
end_date: Optional[str] = Field(None, description="End date in YYYY-MM-DD HH:MM:SS format") end_date: str = Field(..., description="End date in YYYY-MM-DD HH:MM:SS format")
router = APIRouter() router = APIRouter()
@ -71,11 +72,11 @@ async def metrics_query(
], ],
total_points=len(data_points), total_points=len(data_points),
time_range={ time_range={
"start": request.start_date if request.start_date else None, "start": request.start_date,
"end": request.end_date if request.end_date else None "end": request.end_date
} }
) )
await module_logger.log_info( await module_logger.log_info(
f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points") f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points")
return response return response

View File

@ -1,415 +0,0 @@
"""
BaseDoc - A custom document class that provides Beanie-like interface using direct MongoDB operations
"""
import asyncio
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any, Type, Union
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pydantic import BaseModel
from pydantic._internal._model_construction import ModelMetaclass
from common.config.app_settings import app_settings
class QueryExpression:
"""Query expression for field comparisons"""
def __init__(self, field_name: str):
self.field_name = field_name
def __eq__(self, other: Any) -> Dict[str, Any]:
"""Handle field == value comparisons"""
return {self.field_name: other}
def __ne__(self, other: Any) -> Dict[str, Any]:
"""Handle field != value comparisons"""
return {self.field_name: {"$ne": other}}
def __gt__(self, other: Any) -> Dict[str, Any]:
"""Handle field > value comparisons"""
return {self.field_name: {"$gt": other}}
def __lt__(self, other: Any) -> Dict[str, Any]:
"""Handle field < value comparisons"""
return {self.field_name: {"$lt": other}}
def __ge__(self, other: Any) -> Dict[str, Any]:
"""Handle field >= value comparisons"""
return {self.field_name: {"$gte": other}}
def __le__(self, other: Any) -> Dict[str, Any]:
"""Handle field <= value comparisons"""
return {self.field_name: {"$lte": other}}
class FieldDescriptor:
"""Descriptor for field access like Beanie's field == value pattern"""
def __init__(self, field_name: str, field_type: type):
self.field_name = field_name
self.field_type = field_type
def __get__(self, instance: Any, owner: type) -> Any:
"""
- Class access (instance is None): return QueryExpression for building queries
- Instance access (instance is not None): return the actual field value
"""
if instance is None:
return QueryExpression(self.field_name)
return instance.__dict__.get(self.field_name)
def __set__(self, instance: Any, value: Any) -> None:
"""Set instance field value with type validation (compatible with Pydantic validation)"""
if not isinstance(value, self.field_type):
raise TypeError(f"Field {self.field_name} must be {self.field_type}")
instance.__dict__[self.field_name] = value
class FieldCondition:
"""Represents a field condition for MongoDB queries"""
def __init__(self, field_name: str, value: Any, operator: str = "$eq"):
self.field_name = field_name
self.value = value
self.operator = operator
self.left = self # For compatibility with existing condition parsing
self.right = value
# Module-level variables for database connection
_db: Optional[AsyncIOMotorDatabase] = None
_client: Optional[AsyncIOMotorClient] = None
# Context variable for tenant database
import contextvars
_tenant_db_context: contextvars.ContextVar[Optional[AsyncIOMotorDatabase]] = contextvars.ContextVar('tenant_db', default=None)
class QueryModelMeta(ModelMetaclass):
"""Metaclass: automatically create FieldDescriptor for model fields"""
def __new__(cls, name: str, bases: tuple, namespace: dict):
# Get model field annotations (like name: str -> "name" and str)
annotations = namespace.get("__annotations__", {})
# Create the class first using Pydantic's metaclass
new_class = super().__new__(cls, name, bases, namespace)
# After Pydantic processes the fields, add the descriptors as class attributes
for field_name, field_type in annotations.items():
if field_name != 'id': # Skip the id field as it's handled specially
# Add the descriptor as a class attribute
setattr(new_class, field_name, FieldDescriptor(field_name, field_type))
return new_class
def __getattr__(cls, name: str):
"""Handle field access like Doc.field_name for query building"""
# Check if this is a field that exists in the model
if hasattr(cls, 'model_fields') and name in cls.model_fields:
return QueryExpression(name)
raise AttributeError(f"'{cls.__name__}' object has no attribute '{name}'")
class BaseDoc(BaseModel, metaclass=QueryModelMeta):
"""
Base document class that provides Beanie-like interface using direct MongoDB operations.
All model classes should inherit from this instead of Beanie's Document.
"""
id: Optional[str] = None # MongoDB _id field
def model_dump(self, **kwargs):
"""Override model_dump to exclude field descriptors"""
# Get the default model_dump result
result = super().model_dump(**kwargs)
# Remove any field descriptors that might have been included
filtered_result = {}
for key, value in result.items():
if not isinstance(value, FieldDescriptor):
filtered_result[key] = value
return filtered_result
@classmethod
def field(cls, field_name: str) -> QueryExpression:
"""Get a field expression for query building"""
return QueryExpression(field_name)
@classmethod
async def _get_database(cls) -> AsyncIOMotorDatabase:
"""Get database connection using pure AsyncIOMotorClient"""
# Try to get tenant database from context first
tenant_db = _tenant_db_context.get()
if tenant_db is not None:
return tenant_db
# Fallback to global database connection
global _db, _client
if _db is None:
_client = AsyncIOMotorClient(app_settings.MONGODB_URI)
_db = _client[app_settings.MONGODB_NAME]
return _db
@classmethod
def set_tenant_database(cls, db: AsyncIOMotorDatabase):
"""Set the tenant database for this context"""
_tenant_db_context.set(db)
@classmethod
def _get_collection_name(cls) -> str:
"""Get collection name from Settings or class name"""
if hasattr(cls, 'Settings') and hasattr(cls.Settings, 'name'):
return cls.Settings.name
else:
# Convert class name to snake_case for collection name
import re
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', cls.__name__)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
@classmethod
def find(cls, *conditions) -> 'QueryBuilder':
"""Find documents matching conditions - returns QueryBuilder for chaining"""
return QueryBuilder(cls, conditions)
@classmethod
async def find_one(cls, *conditions) -> Optional['BaseDoc']:
"""Find one document matching conditions"""
db = await cls._get_database()
collection_name = cls._get_collection_name()
collection = db[collection_name]
# Convert Beanie-style conditions to MongoDB query
query = cls._convert_conditions_to_query(conditions)
doc = await collection.find_one(query)
if doc:
# Extract MongoDB _id and convert to string
mongo_id = doc.pop('_id', None)
# Filter doc to only include fields defined in the model
model_fields = set(cls.model_fields.keys())
filtered_doc = {k: v for k, v in doc.items() if k in model_fields}
# Add the id field
if mongo_id:
filtered_doc['id'] = str(mongo_id)
return cls(**filtered_doc)
return None
@classmethod
async def get(cls, doc_id: str) -> Optional['BaseDoc']:
"""Get document by ID"""
from bson import ObjectId
try:
object_id = ObjectId(doc_id)
except:
return None
db = await cls._get_database()
collection_name = cls._get_collection_name()
collection = db[collection_name]
doc = await collection.find_one({"_id": object_id})
if doc:
# Extract MongoDB _id and convert to string
mongo_id = doc.pop('_id', None)
# Filter doc to only include fields defined in the model
model_fields = set(cls.model_fields.keys())
filtered_doc = {k: v for k, v in doc.items() if k in model_fields}
# Add the id field
if mongo_id:
filtered_doc['id'] = str(mongo_id)
return cls(**filtered_doc)
return None
@classmethod
def _convert_conditions_to_query(cls, conditions) -> Dict[str, Any]:
"""Convert Beanie-style conditions to MongoDB query"""
if not conditions:
return {}
query = {}
for condition in conditions:
if isinstance(condition, dict):
# Handle QueryExpression results (dictionaries) and direct dictionary queries
query.update(condition)
elif isinstance(condition, FieldCondition):
# Handle legacy FieldCondition objects
if condition.operator == "$eq":
query[condition.field_name] = condition.value
else:
query[condition.field_name] = {condition.operator: condition.value}
elif hasattr(condition, 'left') and hasattr(condition, 'right'):
# Handle field == value conditions
field_name = condition.left.name
value = condition.right
query[field_name] = value
elif hasattr(condition, '__dict__'):
# Handle complex conditions like FLID.identity == value
if hasattr(condition, 'left') and hasattr(condition, 'right'):
left = condition.left
if hasattr(left, 'name') and hasattr(left, 'left'):
# Nested field access like FLID.identity
field_name = f"{left.left.name}.{left.name}"
value = condition.right
query[field_name] = value
else:
field_name = left.name
value = condition.right
query[field_name] = value
return query
def _convert_decimals_to_float(self, obj):
"""Convert Decimal objects to float for MongoDB compatibility"""
from decimal import Decimal
if isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, dict):
return {key: self._convert_decimals_to_float(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._convert_decimals_to_float(item) for item in obj]
else:
return obj
async def create(self) -> 'BaseDoc':
"""Create this document in the database"""
db = await self._get_database()
collection_name = self._get_collection_name()
collection = db[collection_name]
# Convert to dict and insert, excluding field descriptors
doc_dict = self.model_dump(exclude={'id'})
# Convert Decimal objects to float for MongoDB compatibility
doc_dict = self._convert_decimals_to_float(doc_dict)
result = await collection.insert_one(doc_dict)
# Set the id field from the inserted document
if result.inserted_id:
self.id = str(result.inserted_id)
# Return the created document
return self
async def save(self) -> 'BaseDoc':
"""Save this document to the database (update if exists, create if not)"""
db = await self._get_database()
collection_name = self._get_collection_name()
collection = db[collection_name]
# Convert to dict, excluding field descriptors
doc_dict = self.model_dump(exclude={'id'})
# Convert Decimal objects to float for MongoDB compatibility
doc_dict = self._convert_decimals_to_float(doc_dict)
# Try to find existing document by user_id or other unique fields
query = {}
if hasattr(self, 'user_id'):
query['user_id'] = self.user_id
elif hasattr(self, 'email'):
query['email'] = self.email
elif hasattr(self, 'mobile'):
query['mobile'] = self.mobile
elif hasattr(self, 'auth_code'):
query['auth_code'] = self.auth_code
if query:
# Update existing document
result = await collection.update_one(query, {"$set": doc_dict}, upsert=True)
# If it was an insert, set the id field
if result.upserted_id:
self.id = str(result.upserted_id)
else:
# Insert new document
result = await collection.insert_one(doc_dict)
if result.inserted_id:
self.id = str(result.inserted_id)
return self
async def delete(self) -> bool:
"""Delete this document from the database"""
db = await self._get_database()
collection_name = self._get_collection_name()
collection = db[collection_name]
# Try to find existing document by user_id or other unique fields
query = {}
if hasattr(self, 'user_id'):
query['user_id'] = self.user_id
elif hasattr(self, 'email'):
query['email'] = self.email
elif hasattr(self, 'mobile'):
query['mobile'] = self.mobile
elif hasattr(self, 'auth_code'):
query['auth_code'] = self.auth_code
if query:
result = await collection.delete_one(query)
return result.deleted_count > 0
return False
class QueryBuilder:
"""Query builder for chaining operations like Beanie's QueryBuilder"""
def __init__(self, model_class: Type[BaseDoc], conditions: tuple):
self.model_class = model_class
self.conditions = conditions
self._limit_value: Optional[int] = None
self._skip_value: Optional[int] = None
def limit(self, n: int) -> 'QueryBuilder':
"""Limit number of results"""
self._limit_value = n
return self
def skip(self, n: int) -> 'QueryBuilder':
"""Skip number of results"""
self._skip_value = n
return self
async def to_list(self) -> List[BaseDoc]:
"""Convert query to list of documents"""
db = await self.model_class._get_database()
collection_name = self.model_class._get_collection_name()
collection = db[collection_name]
# Convert conditions to MongoDB query
query = self.model_class._convert_conditions_to_query(self.conditions)
# Build cursor
cursor = collection.find(query)
if self._skip_value:
cursor = cursor.skip(self._skip_value)
if self._limit_value:
cursor = cursor.limit(self._limit_value)
# Execute query and convert to model instances
docs = await cursor.to_list(length=None)
results = []
for doc in docs:
# Extract MongoDB _id and convert to string
mongo_id = doc.pop('_id', None)
# Filter doc to only include fields defined in the model
model_fields = set(self.model_class.model_fields.keys())
filtered_doc = {k: v for k, v in doc.items() if k in model_fields}
# Add the id field
if mongo_id:
filtered_doc['id'] = str(mongo_id)
results.append(self.model_class(**filtered_doc))
return results
async def first_or_none(self) -> Optional[BaseDoc]:
"""Get first result or None"""
results = await self.limit(1).to_list()
return results[0] if results else None
async def count(self) -> int:
"""Count number of matching documents"""
db = await self.model_class._get_database()
collection_name = self.model_class._get_collection_name()
collection = db[collection_name]
query = self.model_class._convert_conditions_to_query(self.conditions)
return await collection.count_documents(query)

View File

@ -1,11 +1,11 @@
from beanie import Document
from datetime import datetime from datetime import datetime
from typing import Optional, List from typing import Optional, List
from common.constants.region import UserRegion from common.constants.region import UserRegion
from common.constants.email import EmailSendStatus, BounceType from common.constants.email import EmailSendStatus, BounceType
from backend.models.base_doc import BaseDoc
class MessageTemplateDoc(BaseDoc): class MessageTemplateDoc(Document):
template_id: str template_id: str
tenant_id: Optional[str] = None tenant_id: Optional[str] = None
region: UserRegion region: UserRegion
@ -19,20 +19,20 @@ class MessageTemplateDoc(BaseDoc):
name = "message_templates_doc" name = "message_templates_doc"
indexes = [ indexes = [
"template_id", "template_id",
"tenant_id", "tenant_id",
"region" "region"
] ]
class EmailSenderDoc(BaseDoc): class EmailSenderDoc(Document):
tenant_id: str tenant_id: str
email_sender: Optional[str] = None email_sender: Optional[str] = None
is_active: bool = True is_active: bool = True
class Settings: class Settings:
name = "email_sender_doc" name = "email_sender_doc"
indexes = ["tenant_id"] indexes = ["tenant_id"]
class EmailSendStatusDoc(BaseDoc): class EmailSendStatusDoc(Document):
email_id: str email_id: str
tenant_id: str tenant_id: str
email_sender: Optional[str] = None email_sender: Optional[str] = None
@ -57,7 +57,7 @@ class EmailSendStatusDoc(BaseDoc):
"tenant_id" "tenant_id"
] ]
class EmailTrackingDoc(BaseDoc): class EmailTrackingDoc(Document):
email_id: str email_id: str
tenant_id: str tenant_id: str
recipient_email: str recipient_email: str
@ -85,10 +85,10 @@ class EmailTrackingDoc(BaseDoc):
"tenant_id" "tenant_id"
] ]
class EmailBounceDoc(BaseDoc): class EmailBounceDoc(Document):
email: str email: str
tenant_id: str tenant_id: str
email_id: Optional[str] = None email_id: Optional[str] = None
template_id: Optional[str] = None template_id: Optional[str] = None
bounce_type: BounceType bounce_type: BounceType
reason: str reason: str
@ -105,8 +105,8 @@ class EmailBounceDoc(BaseDoc):
"email", "email",
"tenant_id" "tenant_id"
] ]
class UsageLogDoc(BaseDoc): class UsageLogDoc(Document):
timestamp: datetime = datetime.utcnow() # timestamp timestamp: datetime = datetime.utcnow() # timestamp
tenant_id: str # tenant id tenant_id: str # tenant id
operation: str # operation type operation: str # operation type
@ -117,7 +117,7 @@ class UsageLogDoc(BaseDoc):
bytes_out: int # output bytes bytes_out: int # output bytes
key_id: Optional[str] = None # API Key ID key_id: Optional[str] = None # API Key ID
extra: dict = {} # extra information extra: dict = {} # extra information
class Settings: class Settings:
name = "usage_log_doc" name = "usage_log_doc"
indexes = [ indexes = [

View File

@ -39,42 +39,41 @@ class DatabaseMiddleware:
return await response(scope, receive, send) return await response(scope, receive, send)
if not product_id: if not product_id:
# Compatibility / public routes: use main database with BaseDoc context set # Compatibility / public routes: use main database with tenant models initialized
await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}") await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}")
# Get main database with BaseDoc context set for all models # Get main database with Beanie initialized for tenant models
main_db_initialized = await tenant_cache.get_main_db_initialized() main_db_initialized = await tenant_cache.get_main_db_initialized()
request.state.db = main_db_initialized request.state.db = main_db_initialized
request.state.product_id = None request.state.product_id = None
await self.module_logger.log_info(f"Successfully set BaseDoc context for main database") await self.module_logger.log_info(f"Successfully initialized main database with tenant models")
return await self.app(scope, receive, send) return await self.app(scope, receive, send)
try: try:
# Get tenant-specific database with BaseDoc context set (cached) # Get tenant-specific database with Beanie already initialized (cached)
await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}") await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}")
tenant_db = await tenant_cache.get_initialized_db(product_id) tenant_db = await tenant_cache.get_initialized_db(product_id)
request.state.db = tenant_db request.state.db = tenant_db
request.state.product_id = product_id request.state.product_id = product_id
await self.module_logger.log_info(f"Successfully retrieved cached tenant database with BaseDoc context for product_id: {product_id}") await self.module_logger.log_info(f"Successfully retrieved cached tenant database with Beanie for product_id: {product_id}")
return await self.app(scope, receive, send)
except ValueError as e: except HTTPException as e:
# Handle tenant not found or inactive (ValueError from TenantDBCache) # Handle tenant not found or inactive (HTTPException from TenantDBCache)
await self.module_logger.log_error(f"Tenant error for {product_id}: {str(e)}") await self.module_logger.log_error(f"Tenant error for {product_id}: [{e.status_code}] {e.detail}")
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_404_NOT_FOUND, status_code=e.status_code,
content={"detail": str(e)} content={"detail": e.detail}
) )
return await response(scope, receive, send) return await response(scope, receive, send)
except Exception as e: except Exception as e:
await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}") await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}")
response = JSONResponse( response = JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Database connection error"} content={"detail": "Database connection error"}
) )
return await response(scope, receive, send) return await response(scope, receive, send)
return await self.app(scope, receive, send) return await self.app(scope, receive, send)

View File

@ -1,4 +1,5 @@
from webapi.config.site_settings import site_settings from webapi.config.site_settings import site_settings
from beanie import init_beanie
from fastapi import HTTPException from fastapi import HTTPException
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc
@ -14,7 +15,7 @@ import os
MAIN_CLIENT: Optional[AsyncIOMotorClient] = None MAIN_CLIENT: Optional[AsyncIOMotorClient] = None
TENANT_CACHE: Optional['TenantDBCache'] = None TENANT_CACHE: Optional['TenantDBCache'] = None
# Define document models # Define document models
document_models = [ document_models = [
MessageTemplateDoc, MessageTemplateDoc,
EmailSenderDoc, EmailSenderDoc,
@ -36,7 +37,7 @@ class TenantDBCache:
Uses main_db.tenant_doc to resolve mongodb_uri; caches clients with LRU. Uses main_db.tenant_doc to resolve mongodb_uri; caches clients with LRU.
Database instances are created fresh each time from cached clients. Database instances are created fresh each time from cached clients.
""" """
def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64): def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64):
self.main_db = main_db self.main_db = main_db
self.max_size = max_size self.max_size = max_size
@ -46,22 +47,20 @@ class TenantDBCache:
self.module_logger = ModuleLogger(sender_id="TenantDBCache") self.module_logger = ModuleLogger(sender_id="TenantDBCache")
async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase: async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase:
"""Get tenant database with BaseDoc context set""" """Get tenant database with Beanie already initialized"""
# fast-path: check if client is cached # fast-path: check if client is cached
cached_client = self._cache.get(product_id) cached_client = self._cache.get(product_id)
if cached_client: if cached_client:
await self.module_logger.log_info(f"Found cached client for {product_id}") await self.module_logger.log_info(f"Found cached client for {product_id}")
self._cache.move_to_end(product_id) self._cache.move_to_end(product_id)
# Get fresh database instance from cached client # Get fresh database instance from cached client
db = cached_client.get_default_database() db = cached_client.get_default_database()
if db is not None: if db is not None:
# Set tenant database context for BaseDoc # Initialize Beanie for this fresh database instance
MessageTemplateDoc.set_tenant_database(db) await init_beanie(database=db, document_models=tenant_document_models)
EmailSenderDoc.set_tenant_database(db) await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client")
EmailSendStatusDoc.set_tenant_database(db)
await self.module_logger.log_info(f"BaseDoc tenant context set for {product_id} using cached client")
return db return db
else: else:
await self.module_logger.log_error(f"No default database found for cached client {product_id}") await self.module_logger.log_error(f"No default database found for cached client {product_id}")
@ -75,15 +74,13 @@ class TenantDBCache:
if cached_client: if cached_client:
await self.module_logger.log_info(f"Double-check found cached client for {product_id}") await self.module_logger.log_info(f"Double-check found cached client for {product_id}")
self._cache.move_to_end(product_id) self._cache.move_to_end(product_id)
# Get fresh database instance from cached client # Get fresh database instance from cached client
db = cached_client.get_default_database() db = cached_client.get_default_database()
if db is not None: if db is not None:
# Set tenant database context for BaseDoc # Initialize Beanie for this fresh database instance
MessageTemplateDoc.set_tenant_database(db) await init_beanie(database=db, document_models=tenant_document_models)
EmailSenderDoc.set_tenant_database(db) await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client (double-check)")
EmailSendStatusDoc.set_tenant_database(db)
await self.module_logger.log_info(f"BaseDoc tenant context set for {product_id} using cached client (double-check)")
return db return db
else: else:
await self.module_logger.log_error(f"No default database found for cached client {product_id}") await self.module_logger.log_error(f"No default database found for cached client {product_id}")
@ -131,12 +128,10 @@ class TenantDBCache:
detail=f"No default database found for tenant {product_id}", detail=f"No default database found for tenant {product_id}",
headers={"X-Error-Message": f"No default database found for tenant {product_id}"} headers={"X-Error-Message": f"No default database found for tenant {product_id}"}
) )
# Set tenant database context for BaseDoc # Initialize Beanie for this tenant database
MessageTemplateDoc.set_tenant_database(db) await init_beanie(database=db, document_models=tenant_document_models)
EmailSenderDoc.set_tenant_database(db) await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}")
EmailSendStatusDoc.set_tenant_database(db)
await self.module_logger.log_info(f"BaseDoc tenant context set for new tenant database {product_id}")
# Cache only the client # Cache only the client
await self._lru_put(product_id, client) await self._lru_put(product_id, client)
@ -144,15 +139,10 @@ class TenantDBCache:
return db return db
async def get_main_db_initialized(self) -> AsyncIOMotorDatabase: async def get_main_db_initialized(self) -> AsyncIOMotorDatabase:
"""Get main database with BaseDoc context set for all models""" """Get main database with Beanie initialized for tenant models"""
# Set main database context for all BaseDoc models # Re-initialize Beanie for main database with business models
MessageTemplateDoc.set_tenant_database(self.main_db) await init_beanie(database=self.main_db, document_models=document_models)
EmailSenderDoc.set_tenant_database(self.main_db) await self.module_logger.log_info("Beanie initialization completed for main database")
EmailSendStatusDoc.set_tenant_database(self.main_db)
EmailTrackingDoc.set_tenant_database(self.main_db)
EmailBounceDoc.set_tenant_database(self.main_db)
UsageLogDoc.set_tenant_database(self.main_db)
await self.module_logger.log_info("BaseDoc context set for main database")
return self.main_db return self.main_db
async def _lru_put(self, key: str, client: AsyncIOMotorClient): async def _lru_put(self, key: str, client: AsyncIOMotorClient):
@ -190,7 +180,7 @@ def register(app):
@app.on_event("startup") @app.on_event("startup")
async def start_database(): async def start_database():
await initiate_database(app) await initiate_database(app)
@app.on_event("shutdown") @app.on_event("shutdown")
async def shutdown_database(): async def shutdown_database():
await cleanup_database() await cleanup_database()
@ -199,20 +189,15 @@ def register(app):
async def initiate_database(app): async def initiate_database(app):
"""Initialize main database and tenant cache""" """Initialize main database and tenant cache"""
global MAIN_CLIENT, TENANT_CACHE global MAIN_CLIENT, TENANT_CACHE
module_logger = ModuleLogger(sender_id="DatabaseInit") module_logger = ModuleLogger(sender_id="DatabaseInit")
# 1) Create main/catalog client + DB # 1) Create main/catalog client + DB
MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI) MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI)
main_db = MAIN_CLIENT[app_settings.MONGODB_NAME] main_db = MAIN_CLIENT[app_settings.MONGODB_NAME]
# 2) Set BaseDoc context for main database # 2) Initialize Beanie for main DB with business document models
MessageTemplateDoc.set_tenant_database(main_db) await init_beanie(database=main_db, document_models=document_models)
EmailSenderDoc.set_tenant_database(main_db)
EmailSendStatusDoc.set_tenant_database(main_db)
EmailTrackingDoc.set_tenant_database(main_db)
EmailBounceDoc.set_tenant_database(main_db)
UsageLogDoc.set_tenant_database(main_db)
# 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db # 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db
max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64) max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64)
@ -221,20 +206,20 @@ async def initiate_database(app):
# 4) Store on app state for middleware to access # 4) Store on app state for middleware to access
app.state.main_db = main_db app.state.main_db = main_db
app.state.tenant_cache = TENANT_CACHE app.state.tenant_cache = TENANT_CACHE
await module_logger.log_info("Database and tenant cache initialized successfully") await module_logger.log_info("Database and tenant cache initialized successfully")
async def cleanup_database(): async def cleanup_database():
"""Cleanup database connections and cache""" """Cleanup database connections and cache"""
global MAIN_CLIENT, TENANT_CACHE global MAIN_CLIENT, TENANT_CACHE
module_logger = ModuleLogger(sender_id="DatabaseCleanup") module_logger = ModuleLogger(sender_id="DatabaseCleanup")
if TENANT_CACHE: if TENANT_CACHE:
await TENANT_CACHE.aclose() await TENANT_CACHE.aclose()
if MAIN_CLIENT: if MAIN_CLIENT:
MAIN_CLIENT.close() MAIN_CLIENT.close()
await module_logger.log_info("Database connections closed successfully") await module_logger.log_info("Database connections closed successfully")