Source code for sirepo.runner.celery

# -*- coding: utf-8 -*-
u"""Run jobs

:copyright: Copyright (c) 2016-2018 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
from pykern import pkcli
from pykern import pkconfig
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp, pkdpretty
from sirepo import celery_tasks
from sirepo import runner
from sirepo import simulation_db
import time


[docs]class CeleryJob(runner.JobBase): """Run job in Celery (prod)""" def __init__(self, *args, **kwargs): super(CeleryJob, self).__init__(*args, **kwargs) self.__async_result = None def _is_processing(self): """Job is either in the queue or running""" res = self.__async_result return res and not res.ready() def _kill(self): from celery.exceptions import TimeoutError if not self._is_processing(): return False res = self.__async_result tid = getattr(res, 'task_id', None) pkdlog('{}: kill SIGTERM tid={}', self.jid, tid) try: res.revoke(terminate=True, wait=True, timeout=runner.KILL_TIMEOUT_SECS, signal='SIGTERM') except TimeoutError as e: pkdlog('{}: kill SIGKILL tid={}', self.jid, tid) res.revoke(terminate=True, signal='SIGKILL') def _start(self): """Detach a process from the controlling terminal and run it in the background as a daemon. """ self.__celery_queue = simulation_db.celery_queue(self.data) self.__async_result = celery_tasks.start_simulation.apply_async( args=[self.cmd, str(self.run_dir)], queue=self.__celery_queue, ) pkdc( '{}: started tid={} dir={} queue={}', self.jid, self.__async_result.task_id, self.run_dir, self.__celery_queue, )
[docs]def init_class(app, uwsgi): """Verify celery & rabbit are running""" if pkconfig.channel_in('dev'): return CeleryJob for x in range(10): err = None try: if not celery_tasks.celery.control.ping(): err = 'You need to start Celery:\nsirepo service celery' except Exception: err = 'You need to start Rabbit:\nsirepo service rabbitmq' # Rabbit doesn't have a long timeout, but celery ping does time.sleep(.5) if not err: return CeleryJob #TODO(robnagler) really should be pkconfig.Error() or something else # but this prints a nice message. Don't call sys.exit, not nice pkcli.command_error(err)