Source code for sirepo.cron
"""periodic callbacks
:copyright: Copyright (c) 2025 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp
import asyncio
import inspect
import sirepo.srtime
_loop = None
_MIN_PERIOD = 60
[docs]
class CronTask:
"""Creates a task that runs a coroutine function periodically
``coro_func`` is called at startup and then periodically.
``coro_func`` must `yield_to_event_loop` to release processor
For global tasks, not associated with `quest`.
Args:
period (int or float): how often to run job, must be greater than 60
coro_func (func): async function
params (object): passed verbatim to coro_func
"""
_loop = None
_instances = set()
def __init__(self, period, coro_func, params):
if period < _MIN_PERIOD:
raise AssertionError(f"too frequent period={period} min={_MIN_PERIOD}")
if not inspect.iscoroutinefunction(coro_func):
raise AssertionError(f"not coroutine function={coro_func}")
self._period = float(period)
self._coro_func = coro_func
self._params = params
self._destroyed = False
self._start()
[docs]
def destroy(self):
"""Stop the polling process"""
if self._destroyed:
return
self._destroyed = True
self._instances.remove(self)
[docs]
@classmethod
def init_class(cls, loop):
"""Initialized by service
If ``loop`` is None, then CronTasks do nothing (multi-server case).
Args:
loop (asyncio.EventLoop): event loop or None
"""
if cls._loop is not None:
raise AssertionError("already initialized")
cls._loop = loop if loop else False
if cls._loop:
for s in cls._instances:
s._start()
else:
cls._instances.clear()
async def _poll(self):
while not self._destroyed:
t = sirepo.srtime.utc_now_as_float()
await self._coro_func(self._params)
if self._destroyed:
break
await asyncio.sleep(
self._period - min(sirepo.srtime.utc_now_as_float() - t, self._period),
)
def _start(self):
if self._destroyed or self._loop is False:
return
self._instances.add(self)
if self._loop is None:
return
self._loop.add_callback(self._poll)