freeleaps-service-hub/apps/content/scheduler/schedule_job_locker.py
2024-10-30 07:22:26 -07:00

40 lines
1.2 KiB
Python
Executable File

from datetime import datetime, timedelta, timezone
from scheduler.models import ScheduleJobLockerDoc
from scheduler.constants import ScheduleJobLocker
async def init_lock(locker: ScheduleJobLocker):
lock = await ScheduleJobLockerDoc.find_one({"name": locker})
if lock is None:
lock = ScheduleJobLockerDoc(name=locker.value)
await lock.insert()
async def acquire_lock(locker: ScheduleJobLocker, timeout: int) -> bool:
now = datetime.now(timezone.utc)
# Try to acquire the lock
lock = await ScheduleJobLockerDoc.find_one({"name": locker.value})
if lock:
if lock.locked and lock.expire_time.replace(tzinfo=timezone.utc) > now:
return False
# Set the lock
lock.locked = True
lock.expire_time = now + timedelta(seconds=timeout)
lock.lock_time = now
await lock.save()
return True
else:
return False
async def release_lock(locker: ScheduleJobLocker):
# Release the lock
lock = await ScheduleJobLockerDoc.find_one({"name": locker.value})
if lock:
lock.locked = False
lock.unlock_time = datetime.now(timezone.utc)
await lock.save()