Compare commits

...

17 Commits

Author SHA1 Message Date
semantic-release-bot
b12ab5c505 chore(release): bump version to 1.15.0 and upload released assets [ci skip] 2025-10-28 02:45:48 +00:00
a9ff8aed0b Merge pull request 'merge dev to master' (#95) from dev into master
Reviewed-on: freeleaps/freeleaps-service-hub#95
2025-10-28 02:44:08 +00:00
512e418670 Merge pull request 'feature/icecheng/compute_unit' (#96) from feature/icecheng/compute_unit into dev
Reviewed-on: freeleaps/freeleaps-service-hub#96
2025-10-28 02:11:50 +00:00
icecheng
ad3b2ea938 feat: make init_deployment api support compute_unit 2025-10-28 10:10:53 +08:00
d90df6bf83 Merge pull request 'fix: change notification service model layer to not using Beanie' (#94) from fix/auth-model into dev
Reviewed-on: freeleaps/freeleaps-service-hub#94
Reviewed-by: jingyao1991 <jingyao1991@noreply.gitea.freeleaps.mathmast.com>
2025-10-27 01:52:52 +00:00
haolou
2eb09c834c fix: change notification service model layer to not using Beanie 2025-10-24 14:02:36 +08:00
6b9bdfb205 Merge pull request 'feature/add-request-metrics' (#90) from feature/add-request-metrics into master
Reviewed-on: freeleaps/freeleaps-service-hub#90
Reviewed-by: jingyao1991 <jingyao1991@noreply.gitea.freeleaps.mathmast.com>
2025-10-24 02:06:27 +00:00
semantic-release-bot
f0ce6936f8 chore(release): bump version to 1.14.2 and upload released assets [ci skip] 2025-10-24 01:20:30 +00:00
977d5bef45 Merge pull request 'merge dev to master' (#93) from dev into master
Reviewed-on: freeleaps/freeleaps-service-hub#93
2025-10-24 01:18:56 +00:00
bf4b64cdad Merge pull request 'fix: fix the id wrong type issue' (#92) from fix/auth-model into dev
Reviewed-on: freeleaps/freeleaps-service-hub#92
2025-10-23 08:51:03 +00:00
haolou
f57cfb2d3a fix: fix the id wrong type issue 2025-10-23 16:49:40 +08:00
hazelzhu
b1ec14a984 simplify api routes 2025-10-23 13:48:35 +08:00
hazelzhu
d48f72544d add request metric queries 2025-10-23 12:54:03 +08:00
semantic-release-bot
561bf20a34 chore(release): bump version to 1.14.1 and upload released assets [ci skip] 2025-10-21 03:40:07 +00:00
fbafb19de6 Merge pull request 'merge dev to master' (#86) from dev into master
Reviewed-on: freeleaps/freeleaps-service-hub#86
2025-10-21 03:38:28 +00:00
semantic-release-bot
00da25e93b chore(release): bump version to 1.14.0 and upload released assets [ci skip] 2025-10-14 04:14:26 +00:00
10cc4940f7 Merge pull request 'merge dev to master' (#84) from dev into master
Reviewed-on: freeleaps/freeleaps-service-hub#84
2025-10-14 04:12:39 +00:00
12 changed files with 695 additions and 81 deletions

View File

@ -1,3 +1,46 @@
# [1.15.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.14.2...v1.15.0) (2025-10-28)
### Bug Fixes
* change notification service model layer to not using Beanie ([2eb09c8](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/2eb09c834c147411867d32fc8b640cf784444083))
### Features
* make init_deployment api support compute_unit ([ad3b2ea](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/ad3b2ea938d73d210d8d9df7e0cd448537292c33))
## [1.14.2](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.14.1...v1.14.2) (2025-10-24)
### Bug Fixes
* change middleware to avoid Beanie init ([1823a33](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/1823a33d45abdf2076a2eee5d899e50d8c72993f))
* change the authentication service model layer from Beanie to direct MongoDB ([a936f89](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/a936f8942644d14b5af91bd5d2e372e12cbc2043))
* fix the id wrong type issue ([f57cfb2](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/f57cfb2d3accf7ee8817c8fcdc2bd4a1aa236aa1))
* fix the invalid id format issues ([ba63e1b](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/ba63e1b7a8835e9f898d503a01ec837da83e5072))
## [1.14.1](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.14.0...v1.14.1) (2025-10-21)
### Bug Fixes
* change name to more general ones ([2036c4b](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/2036c4b9ae5bd9d1d7b00daead0e047b6d9e4ebb))
* create apis for magicleaps password things avoiding calling code depot ([497e608](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/497e6080a3cb703621ae9b17f3ca2b0e1c4c3672))
* fix the small issue of user_id not found ([6a207c7](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/6a207c7e79abc9400724caa3ee873f1e579deeb0))
# [1.14.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.13.0...v1.14.0) (2025-10-14)
### Features
* **auth:** add extrnal auth introspect api interface ([7a83237](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/7a832371c7790c77d060344a4141c1dc5c435333))
* **env:** add env config ([6bbaaae](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/6bbaaae30f2649e787aeddd62779301447ae537b))
* **format:** modify the log format ([81f2a21](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/81f2a21f6bb8390f851316a05809041f60b3ea4d))
* **middleware:** add middleware for authentication ([80a6beb](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/80a6beb1ed36d94f3024a80045cbf82a1d0a8844))
* **middleware:** modify database to tolerate the tenant cache ([7844906](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/7844906cfe64b1b2f2d8ae29b064213385c6ce98))
* **model:** add usage_log_doc ([5f18212](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/5f18212343f93eebdec57498a56f60a8065b2501))
# [1.13.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.12.1...v1.13.0) (2025-09-30)

View File

@ -1 +1 @@
1.13.0
1.15.0

View File

@ -150,8 +150,7 @@ class PermissionHandler:
query = {}
if permission_id:
try:
ObjectId(permission_id) # Validate ObjectId format
query["_id"] = permission_id # Use MongoDB's _id field directly
query["_id"] = ObjectId(permission_id) # Convert string to ObjectId for MongoDB
except Exception:
raise RequestValidationError("Invalid permission_id format. Must be a valid ObjectId.")
if permission_key:

View File

@ -144,8 +144,7 @@ class RoleHandler:
query = {}
if role_id:
try:
ObjectId(role_id) # Validate ObjectId format
query["_id"] = role_id # Use MongoDB's _id field directly
query["_id"] = ObjectId(role_id) # Convert string to ObjectId for MongoDB
except Exception:
raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.")
if role_key:

View File

@ -25,6 +25,7 @@ class Deployment(Document):
deployment_git_url: str
deployment_git_sha256: str
deployment_reason: str
compute_unit: Optional[int] = None # None for old data
deployment_app_url: str = "" # URL to access the deployed application, keep it empty to be filled later
deployed_by: str
@ -46,6 +47,7 @@ class InitDeploymentRequest(BaseModel):
user_id: str
reason: str = "not provided"
ttl_hours: int = -1
compute_unit: Optional[int] = 0
class CheckDeploymentStatusRequest(BaseModel):
product_id: str
@ -85,4 +87,5 @@ class DevOpsReconcileRequest(BaseModel):
commit_sha256: Optional[str] = None
target_env: Literal["alpha", "prod"]
ttl_control: bool = False
ttl: int = 10800
ttl: int = 10800
compute_unit: Optional[int] = 0

View File

@ -58,6 +58,7 @@ class DeploymentService:
deployed_by = request.user_id,
created_at = datetime.now(),
updated_at = datetime.now(),
compute_unit = request.compute_unit,
)
await self._start_deployment(deployment)
@ -182,6 +183,7 @@ class DeploymentService:
ttl_control=deployment.deployment_ttl_hours > 0,
ttl=10800 if deployment.deployment_ttl_hours < 0 else deployment.deployment_ttl_hours * 60 * 60,
commit_sha256=deployment.deployment_git_sha256,
compute_unit=deployment.compute_unit
)
# send request to reoncile service
async with httpx.AsyncClient() as client:

View File

@ -41,6 +41,70 @@ class StarRocksMetricsService:
AND product_id = %s
ORDER BY date ASC
""",
"dcr": """
SELECT
date,
product_id,
value,
updated_date
FROM dws_dcr
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"mrar": """
SELECT
date,
product_id,
CASE
WHEN monthly_requests = 0 THEN 0.0
ELSE (monthly_accepted_requests * 1.0) / monthly_requests
END AS value,
updated_date
FROM dws_mrar
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"trar": """
SELECT
product_id,
CASE
WHEN total_requests = 0 THEN 0.0
ELSE (total_accepted_requests * 1.0) / total_requests
END AS value,
updated_date
FROM dws_trar
WHERE product_id = %s
""",
"mrqr": """
SELECT
date,
product_id,
CASE
WHEN monthly_requests = 0 THEN 0.0
ELSE (monthly_quoted_requests * 1.0) / monthly_requests
END AS value,
updated_date
FROM dws_mrqr
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"trqr": """
SELECT
product_id,
CASE
WHEN total_requests = 0 THEN 0.0
ELSE (total_quoted_requests * 1.0) / total_requests
END AS value,
updated_date
FROM dws_trqr
WHERE product_id = %s
""",
},
"magicleaps": {
@ -92,9 +156,9 @@ class StarRocksMetricsService:
self,
product_id: str,
metric_name: str,
step: str,
start_date: Union[str, date],
end_date: Union[str, date]
step: Optional[str],
start_date: Optional[Union[str, date]],
end_date: Optional[Union[str, date]]
) -> List[Dict[str, Any]]:
"""
Query metric data for a specific date range.
@ -140,6 +204,17 @@ class StarRocksMetricsService:
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric need time params
# Starting with "t" indicates a query for the total count since the very first record.
# NOTE: This determination logic is subject to future changes.
if (start_date is None) or (end_date is None) or (step is None):
if metric_name.startswith('t'):
return await self._query_metric_by_product_id(product_id, metric_name)
else:
error_msg = f"Metric '{metric_name}' should be queried by start date, end date and step."
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Parse date strings if they are strings
if isinstance(start_date, str):
try:
@ -284,7 +359,7 @@ class StarRocksMetricsService:
result_dict[date_str] = {
"date": date_str,
"value": int(value) if value is not None else 0,
"value": value if value is not None else 0,
"metric": metric_name,
"labels": labels
}
@ -373,4 +448,67 @@ class StarRocksMetricsService:
"metric_name": metric_name,
"sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(),
"description": f"{metric_name} count from StarRocks table dws_{metric_name}"
}
}
async def _query_metric_by_product_id(self, product_id: str, metric_name: str) -> List[Dict[str, Any]]:
"""
Query metric not suitable for date range (e.g. data related to calculating total records).
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query
Returns:
List of dictionaries with 'product_id' key.
Raises:
Exception: If StarRocks query fails
Example:
result = await service.query_metric_by_time_range(
"freeleaps",
"total_request_quoted_rate",
)
# Returns: [{"date": "freeleaps", "value": 45, "labels": {...}},]
"""
# Get the SQL query for the metric
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
try:
await self.module_logger.log_info(
f"Querying metric '{metric_name}' from product '{product_id}'")
# Execute the query
result = await self.starrocks_client.execute_query(
query=sql_query,
params=(product_id)
)
# Parse the result and format it
for row in result:
# Get the value
value = row.get("value", 0)
if value is None:
value = 0
# Create labels dictionary
labels = {
"product_id": row.get("product_id", product_id),
"metric_type": metric_name,
}
result_dict = []
result_dict.append({
"date": None,
"value": value if value is not None else 0,
"metric": metric_name,
"labels": labels
})
await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}'")
return result_dict
except Exception as e:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise

View File

@ -9,7 +9,7 @@ from backend.services.starrocks_metrics_service import StarRocksMetricsService
class MetricDataPoint(BaseModel):
"""Single data point in metric time series."""
date: str = Field(..., description="Date in YYYY-MM-DD format")
date: Optional[str] = Field(None, description="Date in YYYY-MM-DD format")
value: Union[int, float] = Field(..., description="Metric value")
labels: Dict[str, Any] = Field(default_factory=dict, description="Metric labels")
@ -19,17 +19,16 @@ class MetricTimeSeriesResponse(BaseModel):
metric_name: str = Field(..., description="Name of the queried metric")
data_points: List[MetricDataPoint] = Field(..., description="List of data points")
total_points: int = Field(..., description="Total number of data points")
time_range: Dict[str, str] = Field(..., description="Start and end date of the query")
time_range: Dict[Optional[str], Optional[str]] = Field([None, None], description="Start and end date of the query")
class MetricQueryRequest(BaseModel):
"""Request model for metric query."""
"""Request model for metric query by time range."""
product_id: str = Field(..., description="Product ID to identify which product's data to query")
metric_name: str = Field(..., description="Name of the metric to query")
step: str = Field(..., description="Aggregation step, e.g., 1d or 1m")
start_date: str = Field(..., description="Start date in YYYY-MM-DD HH:MM:SS format")
end_date: str = Field(..., description="End date in YYYY-MM-DD HH:MM:SS format")
step: Optional[str] = Field(None, description="Aggregation step, e.g., 1d or 1m")
start_date: Optional[str] = Field(None, description="Start date in YYYY-MM-DD HH:MM:SS format")
end_date: Optional[str] = Field(None, description="End date in YYYY-MM-DD HH:MM:SS format")
router = APIRouter()
@ -72,11 +71,11 @@ async def metrics_query(
],
total_points=len(data_points),
time_range={
"start": request.start_date,
"end": request.end_date
"start": request.start_date if request.start_date else None,
"end": request.end_date if request.end_date else None
}
)
await module_logger.log_info(
f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points")
return response
return response

View File

@ -0,0 +1,415 @@
"""
BaseDoc - A custom document class that provides Beanie-like interface using direct MongoDB operations
"""
import asyncio
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any, Type, Union
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pydantic import BaseModel
from pydantic._internal._model_construction import ModelMetaclass
from common.config.app_settings import app_settings
class QueryExpression:
"""Query expression for field comparisons"""
def __init__(self, field_name: str):
self.field_name = field_name
def __eq__(self, other: Any) -> Dict[str, Any]:
"""Handle field == value comparisons"""
return {self.field_name: other}
def __ne__(self, other: Any) -> Dict[str, Any]:
"""Handle field != value comparisons"""
return {self.field_name: {"$ne": other}}
def __gt__(self, other: Any) -> Dict[str, Any]:
"""Handle field > value comparisons"""
return {self.field_name: {"$gt": other}}
def __lt__(self, other: Any) -> Dict[str, Any]:
"""Handle field < value comparisons"""
return {self.field_name: {"$lt": other}}
def __ge__(self, other: Any) -> Dict[str, Any]:
"""Handle field >= value comparisons"""
return {self.field_name: {"$gte": other}}
def __le__(self, other: Any) -> Dict[str, Any]:
"""Handle field <= value comparisons"""
return {self.field_name: {"$lte": other}}
class FieldDescriptor:
"""Descriptor for field access like Beanie's field == value pattern"""
def __init__(self, field_name: str, field_type: type):
self.field_name = field_name
self.field_type = field_type
def __get__(self, instance: Any, owner: type) -> Any:
"""
- Class access (instance is None): return QueryExpression for building queries
- Instance access (instance is not None): return the actual field value
"""
if instance is None:
return QueryExpression(self.field_name)
return instance.__dict__.get(self.field_name)
def __set__(self, instance: Any, value: Any) -> None:
"""Set instance field value with type validation (compatible with Pydantic validation)"""
if not isinstance(value, self.field_type):
raise TypeError(f"Field {self.field_name} must be {self.field_type}")
instance.__dict__[self.field_name] = value
class FieldCondition:
"""Represents a field condition for MongoDB queries"""
def __init__(self, field_name: str, value: Any, operator: str = "$eq"):
self.field_name = field_name
self.value = value
self.operator = operator
self.left = self # For compatibility with existing condition parsing
self.right = value
# Module-level variables for database connection
_db: Optional[AsyncIOMotorDatabase] = None
_client: Optional[AsyncIOMotorClient] = None
# Context variable for tenant database
import contextvars
_tenant_db_context: contextvars.ContextVar[Optional[AsyncIOMotorDatabase]] = contextvars.ContextVar('tenant_db', default=None)
class QueryModelMeta(ModelMetaclass):
"""Metaclass: automatically create FieldDescriptor for model fields"""
def __new__(cls, name: str, bases: tuple, namespace: dict):
# Get model field annotations (like name: str -> "name" and str)
annotations = namespace.get("__annotations__", {})
# Create the class first using Pydantic's metaclass
new_class = super().__new__(cls, name, bases, namespace)
# After Pydantic processes the fields, add the descriptors as class attributes
for field_name, field_type in annotations.items():
if field_name != 'id': # Skip the id field as it's handled specially
# Add the descriptor as a class attribute
setattr(new_class, field_name, FieldDescriptor(field_name, field_type))
return new_class
def __getattr__(cls, name: str):
"""Handle field access like Doc.field_name for query building"""
# Check if this is a field that exists in the model
if hasattr(cls, 'model_fields') and name in cls.model_fields:
return QueryExpression(name)
raise AttributeError(f"'{cls.__name__}' object has no attribute '{name}'")
class BaseDoc(BaseModel, metaclass=QueryModelMeta):
"""
Base document class that provides Beanie-like interface using direct MongoDB operations.
All model classes should inherit from this instead of Beanie's Document.
"""
id: Optional[str] = None # MongoDB _id field
def model_dump(self, **kwargs):
"""Override model_dump to exclude field descriptors"""
# Get the default model_dump result
result = super().model_dump(**kwargs)
# Remove any field descriptors that might have been included
filtered_result = {}
for key, value in result.items():
if not isinstance(value, FieldDescriptor):
filtered_result[key] = value
return filtered_result
@classmethod
def field(cls, field_name: str) -> QueryExpression:
"""Get a field expression for query building"""
return QueryExpression(field_name)
@classmethod
async def _get_database(cls) -> AsyncIOMotorDatabase:
"""Get database connection using pure AsyncIOMotorClient"""
# Try to get tenant database from context first
tenant_db = _tenant_db_context.get()
if tenant_db is not None:
return tenant_db
# Fallback to global database connection
global _db, _client
if _db is None:
_client = AsyncIOMotorClient(app_settings.MONGODB_URI)
_db = _client[app_settings.MONGODB_NAME]
return _db
@classmethod
def set_tenant_database(cls, db: AsyncIOMotorDatabase):
"""Set the tenant database for this context"""
_tenant_db_context.set(db)
@classmethod
def _get_collection_name(cls) -> str:
"""Get collection name from Settings or class name"""
if hasattr(cls, 'Settings') and hasattr(cls.Settings, 'name'):
return cls.Settings.name
else:
# Convert class name to snake_case for collection name
import re
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', cls.__name__)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
@classmethod
def find(cls, *conditions) -> 'QueryBuilder':
"""Find documents matching conditions - returns QueryBuilder for chaining"""
return QueryBuilder(cls, conditions)
@classmethod
async def find_one(cls, *conditions) -> Optional['BaseDoc']:
"""Find one document matching conditions"""
db = await cls._get_database()
collection_name = cls._get_collection_name()
collection = db[collection_name]
# Convert Beanie-style conditions to MongoDB query
query = cls._convert_conditions_to_query(conditions)
doc = await collection.find_one(query)
if doc:
# Extract MongoDB _id and convert to string
mongo_id = doc.pop('_id', None)
# Filter doc to only include fields defined in the model
model_fields = set(cls.model_fields.keys())
filtered_doc = {k: v for k, v in doc.items() if k in model_fields}
# Add the id field
if mongo_id:
filtered_doc['id'] = str(mongo_id)
return cls(**filtered_doc)
return None
@classmethod
async def get(cls, doc_id: str) -> Optional['BaseDoc']:
"""Get document by ID"""
from bson import ObjectId
try:
object_id = ObjectId(doc_id)
except:
return None
db = await cls._get_database()
collection_name = cls._get_collection_name()
collection = db[collection_name]
doc = await collection.find_one({"_id": object_id})
if doc:
# Extract MongoDB _id and convert to string
mongo_id = doc.pop('_id', None)
# Filter doc to only include fields defined in the model
model_fields = set(cls.model_fields.keys())
filtered_doc = {k: v for k, v in doc.items() if k in model_fields}
# Add the id field
if mongo_id:
filtered_doc['id'] = str(mongo_id)
return cls(**filtered_doc)
return None
@classmethod
def _convert_conditions_to_query(cls, conditions) -> Dict[str, Any]:
"""Convert Beanie-style conditions to MongoDB query"""
if not conditions:
return {}
query = {}
for condition in conditions:
if isinstance(condition, dict):
# Handle QueryExpression results (dictionaries) and direct dictionary queries
query.update(condition)
elif isinstance(condition, FieldCondition):
# Handle legacy FieldCondition objects
if condition.operator == "$eq":
query[condition.field_name] = condition.value
else:
query[condition.field_name] = {condition.operator: condition.value}
elif hasattr(condition, 'left') and hasattr(condition, 'right'):
# Handle field == value conditions
field_name = condition.left.name
value = condition.right
query[field_name] = value
elif hasattr(condition, '__dict__'):
# Handle complex conditions like FLID.identity == value
if hasattr(condition, 'left') and hasattr(condition, 'right'):
left = condition.left
if hasattr(left, 'name') and hasattr(left, 'left'):
# Nested field access like FLID.identity
field_name = f"{left.left.name}.{left.name}"
value = condition.right
query[field_name] = value
else:
field_name = left.name
value = condition.right
query[field_name] = value
return query
def _convert_decimals_to_float(self, obj):
"""Convert Decimal objects to float for MongoDB compatibility"""
from decimal import Decimal
if isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, dict):
return {key: self._convert_decimals_to_float(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._convert_decimals_to_float(item) for item in obj]
else:
return obj
async def create(self) -> 'BaseDoc':
"""Create this document in the database"""
db = await self._get_database()
collection_name = self._get_collection_name()
collection = db[collection_name]
# Convert to dict and insert, excluding field descriptors
doc_dict = self.model_dump(exclude={'id'})
# Convert Decimal objects to float for MongoDB compatibility
doc_dict = self._convert_decimals_to_float(doc_dict)
result = await collection.insert_one(doc_dict)
# Set the id field from the inserted document
if result.inserted_id:
self.id = str(result.inserted_id)
# Return the created document
return self
async def save(self) -> 'BaseDoc':
"""Save this document to the database (update if exists, create if not)"""
db = await self._get_database()
collection_name = self._get_collection_name()
collection = db[collection_name]
# Convert to dict, excluding field descriptors
doc_dict = self.model_dump(exclude={'id'})
# Convert Decimal objects to float for MongoDB compatibility
doc_dict = self._convert_decimals_to_float(doc_dict)
# Try to find existing document by user_id or other unique fields
query = {}
if hasattr(self, 'user_id'):
query['user_id'] = self.user_id
elif hasattr(self, 'email'):
query['email'] = self.email
elif hasattr(self, 'mobile'):
query['mobile'] = self.mobile
elif hasattr(self, 'auth_code'):
query['auth_code'] = self.auth_code
if query:
# Update existing document
result = await collection.update_one(query, {"$set": doc_dict}, upsert=True)
# If it was an insert, set the id field
if result.upserted_id:
self.id = str(result.upserted_id)
else:
# Insert new document
result = await collection.insert_one(doc_dict)
if result.inserted_id:
self.id = str(result.inserted_id)
return self
async def delete(self) -> bool:
"""Delete this document from the database"""
db = await self._get_database()
collection_name = self._get_collection_name()
collection = db[collection_name]
# Try to find existing document by user_id or other unique fields
query = {}
if hasattr(self, 'user_id'):
query['user_id'] = self.user_id
elif hasattr(self, 'email'):
query['email'] = self.email
elif hasattr(self, 'mobile'):
query['mobile'] = self.mobile
elif hasattr(self, 'auth_code'):
query['auth_code'] = self.auth_code
if query:
result = await collection.delete_one(query)
return result.deleted_count > 0
return False
class QueryBuilder:
"""Query builder for chaining operations like Beanie's QueryBuilder"""
def __init__(self, model_class: Type[BaseDoc], conditions: tuple):
self.model_class = model_class
self.conditions = conditions
self._limit_value: Optional[int] = None
self._skip_value: Optional[int] = None
def limit(self, n: int) -> 'QueryBuilder':
"""Limit number of results"""
self._limit_value = n
return self
def skip(self, n: int) -> 'QueryBuilder':
"""Skip number of results"""
self._skip_value = n
return self
async def to_list(self) -> List[BaseDoc]:
"""Convert query to list of documents"""
db = await self.model_class._get_database()
collection_name = self.model_class._get_collection_name()
collection = db[collection_name]
# Convert conditions to MongoDB query
query = self.model_class._convert_conditions_to_query(self.conditions)
# Build cursor
cursor = collection.find(query)
if self._skip_value:
cursor = cursor.skip(self._skip_value)
if self._limit_value:
cursor = cursor.limit(self._limit_value)
# Execute query and convert to model instances
docs = await cursor.to_list(length=None)
results = []
for doc in docs:
# Extract MongoDB _id and convert to string
mongo_id = doc.pop('_id', None)
# Filter doc to only include fields defined in the model
model_fields = set(self.model_class.model_fields.keys())
filtered_doc = {k: v for k, v in doc.items() if k in model_fields}
# Add the id field
if mongo_id:
filtered_doc['id'] = str(mongo_id)
results.append(self.model_class(**filtered_doc))
return results
async def first_or_none(self) -> Optional[BaseDoc]:
"""Get first result or None"""
results = await self.limit(1).to_list()
return results[0] if results else None
async def count(self) -> int:
"""Count number of matching documents"""
db = await self.model_class._get_database()
collection_name = self.model_class._get_collection_name()
collection = db[collection_name]
query = self.model_class._convert_conditions_to_query(self.conditions)
return await collection.count_documents(query)

View File

@ -1,11 +1,11 @@
from beanie import Document
from datetime import datetime
from typing import Optional, List
from common.constants.region import UserRegion
from common.constants.email import EmailSendStatus, BounceType
from backend.models.base_doc import BaseDoc
class MessageTemplateDoc(Document):
class MessageTemplateDoc(BaseDoc):
template_id: str
tenant_id: Optional[str] = None
region: UserRegion
@ -19,20 +19,20 @@ class MessageTemplateDoc(Document):
name = "message_templates_doc"
indexes = [
"template_id",
"tenant_id",
"tenant_id",
"region"
]
class EmailSenderDoc(Document):
class EmailSenderDoc(BaseDoc):
tenant_id: str
email_sender: Optional[str] = None
email_sender: Optional[str] = None
is_active: bool = True
class Settings:
name = "email_sender_doc"
indexes = ["tenant_id"]
class EmailSendStatusDoc(Document):
class EmailSendStatusDoc(BaseDoc):
email_id: str
tenant_id: str
email_sender: Optional[str] = None
@ -57,7 +57,7 @@ class EmailSendStatusDoc(Document):
"tenant_id"
]
class EmailTrackingDoc(Document):
class EmailTrackingDoc(BaseDoc):
email_id: str
tenant_id: str
recipient_email: str
@ -85,10 +85,10 @@ class EmailTrackingDoc(Document):
"tenant_id"
]
class EmailBounceDoc(Document):
class EmailBounceDoc(BaseDoc):
email: str
tenant_id: str
email_id: Optional[str] = None
email_id: Optional[str] = None
template_id: Optional[str] = None
bounce_type: BounceType
reason: str
@ -105,8 +105,8 @@ class EmailBounceDoc(Document):
"email",
"tenant_id"
]
class UsageLogDoc(Document):
class UsageLogDoc(BaseDoc):
timestamp: datetime = datetime.utcnow() # timestamp
tenant_id: str # tenant id
operation: str # operation type
@ -117,7 +117,7 @@ class UsageLogDoc(Document):
bytes_out: int # output bytes
key_id: Optional[str] = None # API Key ID
extra: dict = {} # extra information
class Settings:
name = "usage_log_doc"
indexes = [

View File

@ -39,41 +39,42 @@ class DatabaseMiddleware:
return await response(scope, receive, send)
if not product_id:
# Compatibility / public routes: use main database with tenant models initialized
# Compatibility / public routes: use main database with BaseDoc context set
await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}")
# Get main database with Beanie initialized for tenant models
# Get main database with BaseDoc context set for all models
main_db_initialized = await tenant_cache.get_main_db_initialized()
request.state.db = main_db_initialized
request.state.product_id = None
await self.module_logger.log_info(f"Successfully initialized main database with tenant models")
await self.module_logger.log_info(f"Successfully set BaseDoc context for main database")
return await self.app(scope, receive, send)
try:
# Get tenant-specific database with Beanie already initialized (cached)
# Get tenant-specific database with BaseDoc context set (cached)
await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}")
tenant_db = await tenant_cache.get_initialized_db(product_id)
request.state.db = tenant_db
request.state.product_id = product_id
await self.module_logger.log_info(f"Successfully retrieved cached tenant database with Beanie for product_id: {product_id}")
await self.module_logger.log_info(f"Successfully retrieved cached tenant database with BaseDoc context for product_id: {product_id}")
return await self.app(scope, receive, send)
except HTTPException as e:
# Handle tenant not found or inactive (HTTPException from TenantDBCache)
await self.module_logger.log_error(f"Tenant error for {product_id}: [{e.status_code}] {e.detail}")
except ValueError as e:
# Handle tenant not found or inactive (ValueError from TenantDBCache)
await self.module_logger.log_error(f"Tenant error for {product_id}: {str(e)}")
response = JSONResponse(
status_code=e.status_code,
content={"detail": e.detail}
status_code=status.HTTP_404_NOT_FOUND,
content={"detail": str(e)}
)
return await response(scope, receive, send)
except Exception as e:
await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}")
response = JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Database connection error"}
)
return await response(scope, receive, send)
return await self.app(scope, receive, send)

View File

@ -1,5 +1,4 @@
from webapi.config.site_settings import site_settings
from beanie import init_beanie
from fastapi import HTTPException
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc
@ -15,7 +14,7 @@ import os
MAIN_CLIENT: Optional[AsyncIOMotorClient] = None
TENANT_CACHE: Optional['TenantDBCache'] = None
# Define document models
# Define document models
document_models = [
MessageTemplateDoc,
EmailSenderDoc,
@ -37,7 +36,7 @@ class TenantDBCache:
Uses main_db.tenant_doc to resolve mongodb_uri; caches clients with LRU.
Database instances are created fresh each time from cached clients.
"""
def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64):
self.main_db = main_db
self.max_size = max_size
@ -47,20 +46,22 @@ class TenantDBCache:
self.module_logger = ModuleLogger(sender_id="TenantDBCache")
async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase:
"""Get tenant database with Beanie already initialized"""
"""Get tenant database with BaseDoc context set"""
# fast-path: check if client is cached
cached_client = self._cache.get(product_id)
if cached_client:
await self.module_logger.log_info(f"Found cached client for {product_id}")
self._cache.move_to_end(product_id)
# Get fresh database instance from cached client
db = cached_client.get_default_database()
if db is not None:
# Initialize Beanie for this fresh database instance
await init_beanie(database=db, document_models=tenant_document_models)
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client")
# Set tenant database context for BaseDoc
MessageTemplateDoc.set_tenant_database(db)
EmailSenderDoc.set_tenant_database(db)
EmailSendStatusDoc.set_tenant_database(db)
await self.module_logger.log_info(f"BaseDoc tenant context set for {product_id} using cached client")
return db
else:
await self.module_logger.log_error(f"No default database found for cached client {product_id}")
@ -74,13 +75,15 @@ class TenantDBCache:
if cached_client:
await self.module_logger.log_info(f"Double-check found cached client for {product_id}")
self._cache.move_to_end(product_id)
# Get fresh database instance from cached client
db = cached_client.get_default_database()
if db is not None:
# Initialize Beanie for this fresh database instance
await init_beanie(database=db, document_models=tenant_document_models)
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client (double-check)")
# Set tenant database context for BaseDoc
MessageTemplateDoc.set_tenant_database(db)
EmailSenderDoc.set_tenant_database(db)
EmailSendStatusDoc.set_tenant_database(db)
await self.module_logger.log_info(f"BaseDoc tenant context set for {product_id} using cached client (double-check)")
return db
else:
await self.module_logger.log_error(f"No default database found for cached client {product_id}")
@ -128,10 +131,12 @@ class TenantDBCache:
detail=f"No default database found for tenant {product_id}",
headers={"X-Error-Message": f"No default database found for tenant {product_id}"}
)
# Initialize Beanie for this tenant database
await init_beanie(database=db, document_models=tenant_document_models)
await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}")
# Set tenant database context for BaseDoc
MessageTemplateDoc.set_tenant_database(db)
EmailSenderDoc.set_tenant_database(db)
EmailSendStatusDoc.set_tenant_database(db)
await self.module_logger.log_info(f"BaseDoc tenant context set for new tenant database {product_id}")
# Cache only the client
await self._lru_put(product_id, client)
@ -139,10 +144,15 @@ class TenantDBCache:
return db
async def get_main_db_initialized(self) -> AsyncIOMotorDatabase:
"""Get main database with Beanie initialized for tenant models"""
# Re-initialize Beanie for main database with business models
await init_beanie(database=self.main_db, document_models=document_models)
await self.module_logger.log_info("Beanie initialization completed for main database")
"""Get main database with BaseDoc context set for all models"""
# Set main database context for all BaseDoc models
MessageTemplateDoc.set_tenant_database(self.main_db)
EmailSenderDoc.set_tenant_database(self.main_db)
EmailSendStatusDoc.set_tenant_database(self.main_db)
EmailTrackingDoc.set_tenant_database(self.main_db)
EmailBounceDoc.set_tenant_database(self.main_db)
UsageLogDoc.set_tenant_database(self.main_db)
await self.module_logger.log_info("BaseDoc context set for main database")
return self.main_db
async def _lru_put(self, key: str, client: AsyncIOMotorClient):
@ -180,7 +190,7 @@ def register(app):
@app.on_event("startup")
async def start_database():
await initiate_database(app)
@app.on_event("shutdown")
async def shutdown_database():
await cleanup_database()
@ -189,15 +199,20 @@ def register(app):
async def initiate_database(app):
"""Initialize main database and tenant cache"""
global MAIN_CLIENT, TENANT_CACHE
module_logger = ModuleLogger(sender_id="DatabaseInit")
# 1) Create main/catalog client + DB
MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI)
main_db = MAIN_CLIENT[app_settings.MONGODB_NAME]
# 2) Initialize Beanie for main DB with business document models
await init_beanie(database=main_db, document_models=document_models)
# 2) Set BaseDoc context for main database
MessageTemplateDoc.set_tenant_database(main_db)
EmailSenderDoc.set_tenant_database(main_db)
EmailSendStatusDoc.set_tenant_database(main_db)
EmailTrackingDoc.set_tenant_database(main_db)
EmailBounceDoc.set_tenant_database(main_db)
UsageLogDoc.set_tenant_database(main_db)
# 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db
max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64)
@ -206,20 +221,20 @@ async def initiate_database(app):
# 4) Store on app state for middleware to access
app.state.main_db = main_db
app.state.tenant_cache = TENANT_CACHE
await module_logger.log_info("Database and tenant cache initialized successfully")
async def cleanup_database():
"""Cleanup database connections and cache"""
global MAIN_CLIENT, TENANT_CACHE
module_logger = ModuleLogger(sender_id="DatabaseCleanup")
if TENANT_CACHE:
await TENANT_CACHE.aclose()
if MAIN_CLIENT:
MAIN_CLIENT.close()
await module_logger.log_info("Database connections closed successfully")