from datetime import datetime, timedelta from typing import Literal, List, Optional from dataclasses import dataclass from enum import Enum from beanie import Document from bson import ObjectId from pydantic import Field, field_validator from pydantic import BaseModel from pymongo import IndexModel class Deployment(Document): deployment_id: str deployment_stage: str deployment_status: Literal["started", "failed", "succeeded", "aborted"] deployment_target_env: Literal["alpha", "prod"] deployment_ttl_hours: int = 2 deployment_project_id: str deployment_project_name: str deployment_product_id: str deployment_product_name: str deployment_git_url: str deployment_git_sha256: str deployment_reason: str deployed_by: str created_at: datetime = datetime.now() updated_at: datetime = datetime.now() class Settings: name = "deployment" indexes = [ IndexModel([("deployment_product_id", 1), ("created_at", 1)]), IndexModel([("deployment_id", 1), ("deployment_status", 1)]), IndexModel([("deployment_id", 1), ("deployment_stage", 1)], unique=True) ] class InitDeploymentRequest(BaseModel): product_id: str sha256: str target_env: str user_id: str reason: str = "not provided" ttl_hours: int = 3 class CheckDeploymentStatusRequest(BaseModel): product_id: str target_env: str user_id: str class CheckApplicationLogsRequest(BaseModel): product_id: str target_env: Literal["alpha", "prod"] = "alpha" user_id: str = '' log_level: List[Literal["info", "error", "debug"]] = Field(default_factory=lambda: ["info"]) start_time: datetime = datetime.now() - timedelta(minutes=5) end_time: datetime = datetime.now() limit: int = 1000 class CheckApplicationLogsResponse(BaseModel): product_id: str target_env: Literal["alpha", "prod"] user_id: str = '' log_level: List[Literal["info", "error", "debug"]] start_time: datetime end_time: datetime limit: int logs: list[str] class DevOpsReconcileOperationType(Enum): START = "start" TERMINATE = "terminate" RESTART = "restart" @dataclass class DevOpsReconcileRequest(BaseModel): operation: DevOpsReconcileOperationType id: str devops_proj_id: str triggered_user_id: str causes: str commit_sha256: Optional[str] = None target_env: Literal["alpha", "prod"] ttl_controled: bool = False ttl: int = 10800