"""Common functionality that is shared between the server, supervisor, and driver.
:copyright: Copyright (c) 2019-2023 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern import pkconfig
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdp, pkdc, pkdlog, pkdexc
import pykern.pkcompat
import pykern.pkconst
import pykern.pkdebug
import sirepo.const
import sirepo.feature_config
import sirepo.srdb
import sirepo.util
import re
ERROR_CODE_RESPONSE_TOO_LARGE = "response_too_large"
OP_ANALYSIS = "analysis"
OP_CANCEL = "cancel"
OP_ERROR = "error"
OP_IO = "io"
OP_JOB_CMD_STDERR = "job_cmd_stderr"
OP_KILL = "kill"
OP_OK = "ok"
#: Agent indicates it is ready
OP_ALIVE = "alive"
OP_RUN = "run"
OP_RUN_STATUS = "run_status"
OP_RUN_STATUS_UPDATE = "run_status_update"
OP_SBATCH_AGENT_READY = "sbatch_agent_ready"
OP_SBATCH_LOGIN = "sbatch_login"
OP_BEGIN_SESSION = "begin_session"
#: Ops which don't need slot allocations or supervisor does not send
OPS_WITHOUT_SLOTS = frozenset(
(
OP_ALIVE,
OP_BEGIN_SESSION,
OP_CANCEL,
OP_ERROR,
OP_KILL,
OP_OK,
OP_RUN_STATUS,
OP_SBATCH_AGENT_READY,
OP_SBATCH_LOGIN,
)
)
#: Types of slots required by op types
CPU_SLOT_OPS = frozenset((OP_ANALYSIS, OP_RUN))
#: All ops that have slots (see job_driver.DriverBase._slots_ready)
SLOT_OPS = frozenset().union(*[CPU_SLOT_OPS, (OP_IO,)])
#: state value (other states are implicit statuses)
STATE_OK = "ok"
_OK_REPLY = PKDict(state=STATE_OK)
#: path supervisor registers to receive messages from agent
AGENT_URI = "/job-agent-websocket"
#: path supervisor registers to receive srtime adjustments from server
SERVER_SRTIME_URI = "/job-api-srtime"
#: path supervisor registers to receive requests from server
SERVER_URI = "/job-api-request"
#: path supervisor registers to receive pings from server
SERVER_PING_URI = "/job-api-ping"
#: path supervisor registers to receive requests from job_process for file PUTs
DATA_FILE_URI = "/job-cmd-data-file"
#: path supervisor registers to receive requests from job_process for global resources
GLOBAL_RESOURCES_URI = "/global-resources"
# POSIT: These are the same queues as in schema-common.common.enum.SbatchQueue
NERSC_QUEUES = frozenset(("debug", "premium", "realtime", "regular"))
#: where user data files come in (job_supervisor)
DATA_FILE_ROOT = None
#: path supervisor registers to receive requests from job_process for simulation file GETs/PUTs
SIM_DB_FILE_URI = "/sim-db-file"
#: how jobs request files (relative to `srdb.root`)
SUPERVISOR_SRV_SUBDIR = "supervisor-srv"
#: how jobs request files (absolute)
SUPERVISOR_SRV_ROOT = None
#: address where supervisor binds to
DEFAULT_IP = "127.0.0.1"
#: port supervisor listens on
DEFAULT_PORT = 8001
#: _cfg declaration for supervisor_uri for drivers
DEFAULT_SUPERVISOR_URI_DECL = (
"http://{}:{}".format(DEFAULT_IP, sirepo.const.PORT_DEFAULTS.supervisor),
str,
"how to reach supervisor",
)
#: status values
CANCELED = "canceled"
COMPLETED = "completed"
ERROR = "error"
JOB_RUN_PURGED = "job_run_purged"
MISSING = "missing"
PENDING = "pending"
RUNNING = "running"
UNKNOWN = "unknown"
#: Queued or running
ACTIVE_STATUSES = frozenset((PENDING, RUNNING))
#: When the job is inactive
EXIT_STATUSES = frozenset((CANCELED, COMPLETED, ERROR, MISSING, JOB_RUN_PURGED))
#: Valid values for job status
STATUSES = EXIT_STATUSES.union(ACTIVE_STATUSES)
#: For communication between job_agent and job_cmd
JOB_CMD_STATE_SBATCH_RUN_STATUS_STOP = "sbatch_run_status_stop"
JOB_CMD_STATE_EXITS = EXIT_STATUSES.union((JOB_CMD_STATE_SBATCH_RUN_STATUS_STOP,))
#: job_cmds
CMD_COMPUTE = "compute"
CMD_DOWNLOAD_RUN_FILE = "download_run_file"
CMD_SBATCH_RUN_STATUS = "sbatch_run_status"
#: jobRunMode and kinds; should come from schema
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
SBATCH = "sbatch"
#: valid jobRunMode values
RUN_MODES = frozenset((SEQUENTIAL, PARALLEL, SBATCH))
#: categories of jobs
KINDS = frozenset((SEQUENTIAL, PARALLEL))
# https://docs.nersc.gov/jobs/policy/
# https://docs.nersc.gov/performance/knl/getting-started/#knl-vs-haswell
#: Max values for each nersc queue
NERSC_QUEUE_MAX = PKDict(
hours=PKDict(
debug=0.5,
premium=48,
regular=48,
),
cores=PKDict(
debug=34816,
premium=56704,
regular=61824,
),
)
_QUASI_SID_PREFIX = "_1_"
#: Allow sids for different kinds of jobs (not simulations)
QUASI_SID_RE = re.compile(f"^{_QUASI_SID_PREFIX}")
#: Must match length of simulation_db._ID_LEN
_QUASI_SID_OP_KEY_LEN = 5
_cfg = None
#: use to separate components of job_id
_JOB_ID_SEP = "-"
[docs]
def agent_cmd_stdin_env(cmd, env, uid, cwd=".", source_bashrc=""):
"""Convert `cmd` with `env` to script and cmd
Uses tempfile so the file can be closed after the subprocess
gets the handle. You have to close `stdin` after calling
`tornado.process.Subprocess`, which calls `subprocess.Popen`
inline, since it' not ``async``.
Args:
cmd (iter): list of words to be quoted
env (str): empty or result of `agent_env`
uid (str): which user should be logged in
cwd (str): directory for the agent to run in (will be created if it doesn't exist)
Returns:
tuple: new cmd (tuple), stdin (file), env (PKDict or None)
"""
import os
import tempfile
if sirepo.feature_config.cfg().trust_sh_env:
source_bashrc = ""
t = tempfile.TemporaryFile()
c = "exec " + " ".join(("'{}'".format(x) for x in cmd))
# POSIT: we control all these values
t.write(
"""{}
set -e
mkdir -p '{}'
cd '{}'
{}
{}
""".format(
source_bashrc,
cwd,
cwd,
env or agent_env(uid=uid),
c,
).encode()
)
t.seek(0)
if sirepo.feature_config.cfg().trust_sh_env:
# Trust the local environment
return ("bash", t, None)
# it's reasonable to hardwire this path, even though we don't
# do that with others. We want to make sure the subprocess starts
# with a clean environment (no $PATH). You have to pass HOME.
return ("/bin/bash", "-l"), t, PKDict(HOME=os.environ["HOME"])
[docs]
def agent_env(uid, env=None):
"""Convert to bash environment
Args:
uid (str): which user is running this agent process
env (str): empty or base environment
Returns:
str: bash environment ``export`` commands
"""
def _fmt(value):
# POSIT: same as pkconfig.to_environ
# TODO(robnagler) export from pkconfig
if value is None:
return ""
if isinstance(value, (frozenset, list, set, tuple)):
return pkconfig.TUPLE_SEP.join(value)
if isinstance(
value,
(bool, complex, float, int, str, pykern.pkconst.PY_PATH_LOCAL_TYPE),
):
# let format handle
return value
raise AssertionError(f"unknown type to format type={type(value)} value={value}")
x = pkconfig.to_environ(
(
"pykern.*",
"sirepo.feature_config.*",
),
exclude_re=pykern.pkdebug.SECRETS_RE,
)
env = (
(env or PKDict())
.pksetdefault(
**x,
)
.pksetdefault(
PYTHONUNBUFFERED="1",
SIREPO_AUTH_LOGGED_IN_USER=uid,
SIREPO_JOB_MAX_MESSAGE_BYTES=_cfg.max_message_bytes,
SIREPO_JOB_PING_INTERVAL_SECS=_cfg.ping_interval_secs,
SIREPO_JOB_PING_TIMEOUT_SECS=_cfg.ping_timeout_secs,
SIREPO_JOB_VERIFY_TLS=_cfg.verify_tls,
SIREPO_SIMULATION_DB_LOGGED_IN_USER=uid,
SIREPO_SRDB_ROOT=lambda: sirepo.srdb.root(),
)
)
if not sirepo.feature_config.cfg().trust_sh_env:
env.pksetdefault(
PYTHONPATH="",
PYTHONSTARTUP="",
)
for k in env.keys():
assert not pykern.pkdebug.SECRETS_RE.search(
k
), "unexpected secret key={} match={}".format(
k,
pykern.pkdebug.SECRETS_RE,
)
return "\n".join(("export {}='{}'".format(k, _fmt(v)) for k, v in env.items()))
[docs]
def cfg():
return _cfg or init_module()
[docs]
def init_module():
global _cfg
if _cfg:
return _cfg
_cfg = pkconfig.init(
max_message_bytes=(
int(2e8),
pkconfig.parse_bytes,
"maximum message size throughout system",
),
ping_interval_secs=(
2 * 60,
pkconfig.parse_seconds,
"how long to wait between sending keep alive pings",
),
ping_timeout_secs=(
15,
pkconfig.parse_seconds,
"how long to wait for a ping response",
),
server_secret=(
"a very secret, secret",
str,
"shared secret between supervisor and server",
),
verify_tls=(
not pkconfig.channel_in("dev"),
bool,
"do not validate (self-signed) certs",
),
)
global SUPERVISOR_SRV_ROOT, DATA_FILE_ROOT
SUPERVISOR_SRV_ROOT = sirepo.srdb.root().join(SUPERVISOR_SRV_SUBDIR)
DATA_FILE_ROOT = SUPERVISOR_SRV_ROOT.join(DATA_FILE_URI[1:])
return _cfg
[docs]
def is_ok_reply(value):
if not isinstance(value, PKDict):
return False
return value == _OK_REPLY or value.get("state") == COMPLETED
[docs]
def join_jid(uid, sid, compute_model):
"""A Job is a tuple of user, sid, and compute_model.
A jid is words and dashes.
Args:
uid (str): user id
sid (str): simulation id
compute_model (str): model name
Returns:
str: unique name (treat opaquely)
"""
return _JOB_ID_SEP.join((uid, sid, compute_model))
[docs]
def ok_reply():
return _OK_REPLY.copy()
[docs]
def quasi_jid(uid, op_key, method):
"""Creates an id for a non-simulation job
Args:
uid (str): user id
op_key (str): "stful" or "stlss"
method (str): statelessCompute or statefulCompute method
"""
assert len(op_key) == _QUASI_SID_OP_KEY_LEN
return join_jid(uid, _QUASI_SID_PREFIX + op_key, method)
[docs]
def sbatch_login_ok():
"""Response for sbatchLogin API
Returns:
PKDict: success response
"""
return PKDict(loginSuccess=True)
[docs]
def split_jid(jid):
"""Split jid into named parts
Args:
jid (str): properly formed job identifier
Returns:
PKDict: parts named uid, sid, compute_model.
"""
return PKDict(
pykern.pkcompat.zip_strict(
("uid", "sid", "compute_model"),
jid.split(_JOB_ID_SEP),
)
)
[docs]
def supervisor_file_uri(supervisor_uri, *args):
# trailing slash necessary
return "{}{}/".format(supervisor_uri, "/".join(args))