"""Type-based simulation operations
:copyright: Copyright (c) 2019 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern import pkcompat
from pykern import pkconfig
from pykern import pkconst
from pykern import pkinspect
from pykern import pkio
from pykern import pkjson
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdp, pkdexc, pkdc, pkdformat
import hashlib
import os.path
import re
import sirepo.const
import sirepo.feature_config
import sirepo.job
import sirepo.resource
import sirepo.srdb
import sirepo.util
import subprocess
import urllib
import uuid
_cfg = None
#: default compute_model
_ANIMATION_NAME = "animation"
#: models which should not get persisted
_CLIENT_ONLY_MODELS = frozenset(
(
"completeRegistration",
"emailLogin",
"ldapLogin",
"moderationRequest",
"moveItem",
"renameItem",
"simFolder",
)
)
_MODEL_RE = re.compile(r"^[\w-]+$")
_IS_PARALLEL_RE = re.compile("animation", re.IGNORECASE)
#: separates values in frame id
_FRAME_ID_SEP = "*"
#: common keys to frame id followed by code-specific values
_FRAME_ID_KEYS = (
"frameIndex",
# computeModel when passed from persistent/parallel
# analysisModel when passe from transient/sequential
# sim_data.compute_model() is idempotent to this.
"frameReport",
"simulationId",
"simulationType",
"computeJobHash",
"computeJobSerial",
)
_TEMPLATE_RESOURCE_DIR = "template"
#: Absolute path of rsmanifest file
_RSMANIFEST_PATH = pkio.py_path("/rsmanifest" + sirepo.const.JSON_SUFFIX)
[docs]
def audit_proprietary_lib_files(qcall, force=False, sim_types=None, uid=None):
"""Add/removes proprietary files based on a user's roles
For example, add the Flash tarball if user has the flash role.
Args:
qcall (quest.API): logged in user
force (bool): Overwrite existing lib files with the same name as new ones
sim_types (set): Set of sim_types to audit (proprietary_sim_types if None)
"""
from sirepo import simulation_db, sim_run
def _add(proprietary_code_dir, sim_type, cls):
p = proprietary_code_dir.join(cls.proprietary_code_tarball())
with sim_run.tmp_dir(chdir=True, qcall=qcall) as t:
d = t.join(p.basename)
d.mksymlinkto(p, absolute=False)
subprocess.check_output(
[
"tar",
"--extract",
"--gunzip",
f"--file={d}",
],
stderr=subprocess.STDOUT,
)
# lib_dir may not exist: git.radiasoft.org/ops/issues/645
l = pkio.mkdir_parent(
simulation_db.simulation_lib_dir(sim_type, qcall=qcall),
)
e = [f.basename for f in pkio.sorted_glob(l.join("*"))]
for f in cls.proprietary_code_lib_file_basenames():
if force or f not in e:
t.join(f).rename(l.join(f))
def _iter(stypes, uid):
for t in stypes:
c = get_class(t)
if not c.proprietary_code_tarball():
continue
d = sirepo.srdb.proprietary_code_dir(t)
assert d.exists(), f"{d} proprietary_code_dir must exist" + (
"; run: sirepo setup_dev" if pkconfig.in_dev_mode() else ""
)
r = qcall.auth_db.model("UserRole").has_active_role(
role=sirepo.auth_role.for_sim_type(t),
uid=uid,
)
if r:
_add(d, t, c)
continue
# SECURITY: User no longer has access so remove all artifacts
pkio.unchecked_remove(simulation_db.simulation_dir(t, qcall=qcall))
def _sim_types():
rv = sirepo.feature_config.proprietary_sim_types()
if not sim_types:
return rv
if not sim_types.issubset(rv):
raise AssertionError(
f"sim_types={sim_types} not a subset of proprietary_sim_types={rv}"
)
return sim_types
s = _sim_types()
if uid:
with qcall.auth.logged_in_user_set(uid):
_iter(s, uid)
else:
_iter(s, qcall.auth.logged_in_user(check_path=False))
[docs]
def get_class(type_or_data):
"""Simulation data class
Args:
type_or_data (str or dict): simulation type or description
Returns:
type: simulation data operation class
"""
return sirepo.util.import_submodule("sim_data", type_or_data).SimData
[docs]
def resource_path(filename):
"""Path to common (not specific to sim type) resource file"""
return sirepo.resource.file_path(_TEMPLATE_RESOURCE_DIR, filename)
[docs]
def template_globals(sim_type=None):
"""Initializer for templates
Usage::
_SIM_DATA, SIM_TYPE, _SCHEMA = sirepo.sim_data.template_globals()
Args:
sim_type (str): simulation type [calling module's basename]
Returns:
(class, str, object): SimData class, simulation type, and schema
"""
c = get_class(sim_type or pkinspect.module_basename(pkinspect.caller_module()))
return c, c.sim_type(), c.schema()
[docs]
def parse_frame_id(frame_id):
"""Parse the frame_id and return it along with self
Args:
frame_id (str): values separated by "*"
Returns:
PKDict: frame_args
SimDataBase: sim_data object for this simulationType
"""
v = frame_id.split(_FRAME_ID_SEP)
res = PKDict(zip(_FRAME_ID_KEYS, v[: len(_FRAME_ID_KEYS)]))
res.frameIndex = int(res.frameIndex)
res.computeJobSerial = int(res.computeJobSerial)
s = get_class(res.simulationType)
s.frameReport = s.parse_model(res)
s.simulationId = s.parse_sid(res)
# TODO(robnagler) validate these
res.update(
zip(
s._frame_id_fields(res),
[SimDataBase._frame_param_to_field(x) for x in v[len(_FRAME_ID_KEYS) :]],
)
)
return res, s
[docs]
class SimDataBase(object):
ANALYSIS_ONLY_FIELDS = frozenset()
WATCHPOINT_REPORT = "watchpointReport"
WATCHPOINT_REPORT_RE = re.compile(r"^{}(\d+)$".format(WATCHPOINT_REPORT))
_EXAMPLE_RESOURCE_DIR = "examples"
_EXE_PERMISSIONS = 0o700
LIB_DIR = sirepo.const.LIB_DIR
[docs]
@classmethod
def compute_job_hash(cls, data, qcall):
"""Hash fields related to data and set computeJobHash
Only needs to be unique relative to the report, not globally unique
so MD5 is adequate. Long and cryptographic hashes make the
cache checks slower.
Args:
data (dict): simulation data
changed (callable): called when value changed
Returns:
bytes: hash value
"""
cls._assert_server_side()
c = cls.compute_model(data)
if data.get("forceRun") or cls.is_parallel(c):
return "HashIsUnused"
m = data["models"]
res = hashlib.md5()
fields = sirepo.sim_data.get_class(data.simulationType)._compute_job_fields(
data, data.report, c
)
# values may be string or PKDict
fields.sort(key=lambda x: str(x))
for f in fields:
# assert isinstance(f, pkconfig.STRING_TYPES), \
# 'value={} not a string_type'.format(f)
# TODO(pjm): work-around for now
if isinstance(f, pkconfig.STRING_TYPES):
x = f.split(".")
value = m[x[0]][x[1]] if len(x) > 1 else m[x[0]]
else:
value = f
res.update(
pkjson.dump_bytes(
value,
sort_keys=True,
allow_nan=False,
)
)
res.update(
"".join(
(
str(cls.lib_file_abspath(b, data=data, qcall=qcall).mtime())
for b in sorted(cls.lib_file_basenames(data))
),
).encode()
)
return res.hexdigest()
[docs]
@classmethod
def compute_model(cls, model_or_data):
"""Compute model for this model_or_data
Args:
model_or_data (): analysis model
Returns:
str: name of compute model for report
"""
m = cls.parse_model(model_or_data)
d = model_or_data if isinstance(model_or_data, dict) else None
# TODO(robnagler) is this necesary since m is parsed?
return cls.parse_model(cls._compute_model(m, d))
[docs]
@classmethod
def does_api_reply_with_file(cls, api, method):
"""Identify which job_api calls expect files
Args:
api (str): job_api method, e.g. api_statelessCompute
method (str): template sub-method of `api`, e.g. sample_preview
Returns:
bool: True if `method` can return a file
"""
return False
[docs]
@classmethod
def example_paths(cls):
return sirepo.resource.glob_paths(
_TEMPLATE_RESOURCE_DIR,
cls.sim_type(),
cls._EXAMPLE_RESOURCE_DIR,
f"*{sirepo.const.JSON_SUFFIX}",
)
[docs]
@classmethod
def fixup_old_data(cls, data, qcall, **kwargs):
"""Update model data to latest schema
Modifies `data` in place.
Args:
data (dict): simulation
"""
raise NotImplementedError()
[docs]
@classmethod
def frame_id(cls, data, response, model, index):
"""Generate a frame_id from values (unit testing)
Args:
data (PKDict): model data
response (PKDict): JSON response
model (str): animation name
index (int): index of frame
Returns:
str: combined frame id
"""
assert response.frameCount > index, pkdformat(
"response={} does not contain enough frames for index={}", response, index
)
frame_args = response.copy()
frame_args.frameReport = model
m = data.models[model]
return _FRAME_ID_SEP.join(
[
# POSIT: same order as _FRAME_ID_KEYS
str(index),
model,
data.models.simulation.simulationId,
data.simulationType,
response.computeJobHash,
str(response.computeJobSerial),
]
+ [pkjson.dump_str(m.get(k)) for k in cls._frame_id_fields(frame_args)],
)
[docs]
@classmethod
def is_parallel(cls, data_or_model):
"""Is this report a parallel (long) simulation?
Args:
data_or_model (dict): sim data or compute_model
Returns:
bool: True if parallel job
"""
return bool(
_IS_PARALLEL_RE.search(
cls.compute_model(data_or_model)
if isinstance(data_or_model, dict)
else data_or_model
),
)
[docs]
@classmethod
def is_run_mpi(cls):
raise NotImplementedError()
[docs]
@classmethod
def is_watchpoint(cls, name):
return cls.WATCHPOINT_REPORT in name
[docs]
@classmethod
def lib_file_abspath(cls, basename, data=None, qcall=None):
"""Returns full, unique paths of simulation files
Args:
basename (str): lib file basename
data: DEPRECATED
Returns:
object: py.path.local to files (duplicates removed) OR py.path.local
"""
p = cls._lib_file_abspath_or_exists(basename, qcall=qcall)
if p:
return p
raise sirepo.util.UserAlert(
'Simulation library file "{}" does not exist'.format(basename),
"basename={} not in lib or resource directories",
basename,
)
[docs]
@classmethod
def lib_file_basenames(cls, data):
"""List files used by the simulation
Args:
data (dict): sim db
Returns:
set: list of str, sorted
"""
# _lib_file_basenames may return duplicates
return sorted(set(cls._lib_file_basenames(data)))
[docs]
@classmethod
def lib_file_exists(cls, basename, qcall=None):
"""Does `basename` exist in library
Args:
basename (str): to test for existence
qcall (quest.API): quest state
Returns:
bool: True if it exists
"""
return cls._lib_file_abspath_or_exists(basename, qcall=qcall, exists_only=True)
[docs]
@classmethod
def lib_file_is_zip(cls, basename):
"""Is this lib file a zip file?
Args:
basename (str): to search
Returns:
bool: True if is a zip file
"""
return basename.endswith(".zip")
[docs]
@classmethod
def lib_file_in_use(cls, data, basename):
"""Check if file in use by simulation
Args:
data (dict): simulation
basename (str): to check
Returns:
bool: True if `basename` in use by `data`
"""
return any(
f
for f in cls.lib_file_basenames(data)
if cls.lib_file_name_without_type(f)
== cls.lib_file_name_without_type(basename)
)
[docs]
@classmethod
def lib_file_names_for_type(cls, file_type, qcall=None):
"""Return sorted list of files which match `file_type`
Args:
file_type (str): in the format of ``model-field``
Returns:
list: sorted list of file names stripped of file_type
"""
return sorted(
(
cls.lib_file_name_without_type(f.basename)
for f in cls._lib_file_list("{}.*".format(file_type), qcall=qcall)
)
)
[docs]
@classmethod
def lib_file_name_with_model_field(cls, model_name, field, filename):
return "{}-{}.{}".format(model_name, field, filename)
[docs]
@classmethod
def lib_file_name_with_type(cls, filename, file_type):
return "{}.{}".format(file_type, filename)
[docs]
@classmethod
def lib_file_name_without_type(cls, basename):
"""Strip the file type prefix
See `lib_file_name` which prefixes with ``model-field.``
Args:
basename: lib file name with type
Returns:
str: basename without type prefix
"""
return re.sub(r"^.*?-.*?\.(.+)$", r"\1", basename)
[docs]
@classmethod
def lib_file_resource_path(cls, basename):
"""Location of lib file in source distribution
Args:
basename (str): complete name of lib file
Returns:
py.path: Absolute path to file in source distribution
"""
return sirepo.resource.file_path(
_TEMPLATE_RESOURCE_DIR,
cls.sim_type(),
cls.LIB_DIR,
basename,
)
[docs]
@classmethod
def lib_file_read_binary(cls, basename, qcall=None):
"""Get contents of `basename` from lib as bytes
Args:
basename (str): full name including suffix
qcall (quest.API): logged in user
Returns:
bytes: contents of file
"""
if cls._is_agent_side():
return cls.sim_db_client().get(cls.LIB_DIR, basename)
return cls._lib_file_abspath(basename, qcall=qcall).read_binary()
[docs]
@classmethod
def lib_file_read_text(cls, *args, **kwargs):
"""Get contents of `basename` from lib as str
Args:
basename (str): full name including suffix
qcall (quest.API): logged in user
Returns:
str: contents of file
"""
return pkcompat.from_bytes(cls.lib_file_read_binary(*args, **kwargs))
[docs]
@classmethod
def lib_file_save_from_url(cls, url, model_name, field):
"""Fetch `url` and save to lib
Path to save to is `lib_file_name_with_model_field` is called
with the basename of `url`.
Args:
url (str): web address
model_name (str): model name
field (str): field of the model
"""
c = cls.sim_db_client()
c.save_from_url(
url,
c.uri(
cls.LIB_DIR,
cls.lib_file_name_with_model_field(
model_name,
field,
os.path.basename(urllib.parse.urlparse(url).path),
),
),
)
[docs]
@classmethod
def lib_file_size(cls, basename, qcall=None):
"""Get size of `basename` from lib
Args:
basename (str): full name including suffix
qcall (quest.API): logged in user
Returns:
int: size in bytes
"""
if cls._is_agent_side():
return cls.sim_db_client().size(cls.LIB_DIR, basename)
return cls._lib_file_abspath(basename, qcall=qcall).size()
[docs]
@classmethod
def lib_file_write(cls, basename, path_or_content, qcall=None):
"""Save `content` to `basename` in lib
Args:
basename (str): full name including suffix
path_or_content (str|bytes|py.path): what to save, may be text or binary
qcall (quest.API): logged in user
"""
def _target():
return (
cls._simulation_db()
.simulation_lib_dir(cls.sim_type(), qcall=qcall)
.join(basename)
)
if cls._is_agent_side():
cls.sim_db_client().put(cls.LIB_DIR, basename, path_or_content)
return
if isinstance(path_or_content, pkconst.PY_PATH_LOCAL_TYPE):
path_or_content.copy(_target())
else:
_target().write_binary(pkcompat.to_bytes(path_or_content))
[docs]
@classmethod
def lib_file_write_path(cls, basename, qcall=None):
"""DEPRECATED: Use `lib_file_write`"""
return (
cls._simulation_db()
.simulation_lib_dir(cls.sim_type(), qcall=qcall)
.join(basename)
)
[docs]
@classmethod
def lib_files_for_export(cls, data, qcall=None):
cls._assert_server_side()
res = []
for b in cls.lib_file_basenames(data):
f = cls.lib_file_abspath(b, data=data, qcall=qcall)
if f.exists():
res.append(f)
return res
[docs]
@classmethod
def lib_files_from_other_user(cls, data, other_lib_dir, qcall):
"""Copy auxiliary files to other user
Does not copy resource files. Only works locally.
Args:
data (dict): simulation db
other_lib_dir (py.path): source directory
"""
t = cls._simulation_db().simulation_lib_dir(cls.sim_type(), qcall=qcall)
for f in cls._lib_file_basenames(data):
s = other_lib_dir.join(f)
if s.exists():
s.copy(t.join(f))
[docs]
@classmethod
def lib_files_to_run_dir(cls, data, run_dir):
"""Copy auxiliary files to run_dir
Args:
data (dict): simulation db
run_dir (py.path): where to copy to
"""
for b in cls.lib_file_basenames(data):
t = run_dir.join(b)
s = cls.lib_file_abspath(b, data=data)
if t != s:
t.mksymlinkto(s, absolute=False)
[docs]
@classmethod
def model_defaults(cls, name):
"""Returns a set of default model values from the schema.
Some special cases:
if the data type is "UUID" and the default value is empty, set the
value to a new UUID string
if the data type is "RandomId" and the default value is empty, set the
value to a new Base62 string
if the data type has the form "model.zzz", set the value to the default
value of model "zzz"
Args:
name (str): model name
"""
import copy
res = PKDict()
for f, d in cls.schema().model[name].items():
if len(d) >= 3 and d[2] is not None:
m = d[1].split(".")
if len(m) > 1 and m[0] == "model" and m[1] in cls.schema().model:
res[f] = cls.model_defaults(m[1])
for ff, dd in d[2].items():
res[f][ff] = copy.deepcopy(d[2][ff])
continue
res[f] = copy.deepcopy(d[2])
if d[1] == "UUID" and not res[f]:
res[f] = str(uuid.uuid4())
if d[1] == "RandomId" and not res[f]:
res[f] = sirepo.util.random_base62(length=16)
return res
[docs]
@classmethod
def parse_jid(cls, data, uid):
"""A Job is a tuple of user, sid, and compute_model.
A jid is words and dashes.
Args:
data (dict): extract sid and compute_model
uid (str): user id
Returns:
str: unique name (treat opaquely)
"""
return sirepo.job.join_jid(uid, cls.parse_sid(data), cls.compute_model(data))
[docs]
@classmethod
def parse_model(cls, obj):
"""Find the model in the arg
Looks for `frameReport`, `report`, and `modelName`. Might be a compute or
analysis model.
Args:
obj (str or dict): simulation type or description
Returns:
str: target of the request
"""
if isinstance(obj, pkconfig.STRING_TYPES):
res = obj
elif isinstance(obj, dict):
for i in ("frameReport", "report", "computeModel", "modelName"):
if i in obj:
res = obj.get(i)
break
else:
res = None
else:
raise AssertionError("obj={} is unsupported type={}", obj, type(obj))
assert res and _MODEL_RE.search(res), "invalid model={} from obj={}".format(
res, obj
)
return res
[docs]
@classmethod
def parse_sid(cls, obj):
"""Extract simulationId from obj
Args:
obj (object): may be data, req, resp, or string
Returns:
str: simulation id
"""
if isinstance(obj, pkconfig.STRING_TYPES):
res = obj
elif isinstance(obj, dict):
res = obj.get("simulationId") or obj.pknested_get(
"models.simulation.simulationId"
)
else:
raise AssertionError("obj={} is unsupported type={}", obj, type(obj))
return cls._simulation_db().assert_sid(res)
[docs]
@classmethod
def poll_seconds(cls, data):
"""Client poll period for simulation status
TODO(robnagler) needs to be encapsulated
Args:
data (dict): must container report name
Returns:
int: number of seconds to poll
"""
return 2 if cls.is_parallel(data) else 1
[docs]
@classmethod
def prepare_import_file_args(cls, req):
return cls._prepare_import_file_name_args(req).pkupdate(
file_as_str=req.form_file.as_str(),
import_file_arguments=req.import_file_arguments,
)
[docs]
@classmethod
def proprietary_code_tarball(cls):
return None
[docs]
@classmethod
def proprietary_code_lib_file_basenames(cls):
return []
[docs]
@classmethod
def put_sim_file(cls, sim_id, src_file_name, dst_basename):
"""Write a file to the simulation's database directory
Args:
sim_id (str): simulation id
src_file_name (str or py.path): local file to send to sim_db
dst_basename (str): name in sim repo dir
"""
return cls.sim_db_client().put(
sim_id,
dst_basename,
pkio.read_binary(src_file_name),
)
[docs]
@classmethod
def resource_path(cls, filename):
"""Static resource (package_data) file for simulation
Returns:
py.path.local: absolute path to file
"""
return sirepo.resource.file_path(
_TEMPLATE_RESOURCE_DIR, cls.sim_type(), filename
)
[docs]
@classmethod
def schema(cls):
"""Get schema for code
Returns:
PKDict: schema
"""
# TODO(robnagler) cannot use cls._simulation_db, because needed in templates
# schema should be available so move out of simulation_db.
from sirepo import simulation_db
return cls._memoize(simulation_db.get_schema(cls.sim_type()))
[docs]
@classmethod
def sim_db_client(cls):
"""Low-level for sim_db_file ops for job agents
Used to manipulate sim db files in job agent. Care should be
taken to avoid inefficiencies as these are remote requests.
Typically, these are done in job_cmd, not job_agent, because
operations are synchronous.
Returns:
SimDbClient: interface to `sirepo.sim_db_file`
"""
# This assertion is sufficient even though memoized, because
# it is executed once per process.
from sirepo import sim_db_file
cls._assert_agent_side()
return cls._memoize(sim_db_file.SimDbClient(cls))
[docs]
@classmethod
def sim_db_read_sim(cls, sim_id, sim_type=None, qcall=None):
"""Read simulation sdata for `sim_id`
Calls `simulation_db.read_simulation_json`
Args:
sim_id (str): which simulation
sim_type (str): simulation type [`cls.sim_type()`]
qcall (quest.API): quest [None]
Returns:
PKDict: sdata
"""
if cls._is_agent_side():
return cls.sim_db_client().read_sim(sim_id, sim_type=sim_type)
return cls._simulation_db().read_simulation_json(
sim_type or cls.sim_type(), sim_id, qcall=qcall
)
[docs]
@classmethod
def sim_db_save_sim(cls, sdata, qcall=None):
"""Save `sdata` to simulation db.
Calls `simulation_db.save_simulation_json`
Args:
sdata (PKDict): what to write
qcall (quest.API): quest [None]
Returns:
PKDict: updated sdata
"""
if not isinstance(sdata, PKDict):
raise AssertionError(f"sdata unexpected type={type(sdata)}")
if cls._is_agent_side():
return cls.sim_db_client().save_sim(sdata)
# TODO(robnagler) normalize so that same code is used
return cls._simulation_db().save_simulation_json(
sdata,
fixup=True,
qcall=qcall,
modified=True,
)
[docs]
@classmethod
def sim_file_basenames(cls, data):
"""List of files needed for this simulation
Returns:
list: basenames of sim repo dir
"""
return cls._sim_file_basenames(data)
[docs]
@classmethod
def sim_files_to_run_dir(cls, data, run_dir):
"""Copy files from sim repo dir to `run_dir`
Calls `sim_file_basenames` to get list of sim files.
Args:
data (PKDict): used to identify simulation
run_dir (py.path): directory to write to
"""
for b in cls.sim_file_basenames(data):
cls._read_binary_and_save(
data.models.simulation.simulationId,
b.basename,
run_dir,
is_exe=b.get("is_exe", False),
)
[docs]
@classmethod
def sim_run_dir_prepare(cls, run_dir, data=None):
"""Create and install files, update parameters, and generate command.
Copies files into the simulation directory (``run_dir``)
Updates the parameters in ``data`` and save.
Generate the pkcli command.
Args:
run_dir (py.path.local or str): dir simulation will be run in
data (PKDict): optional, will read from run_dir
Returns:
list: pkcli command to execute
"""
from sirepo import sim_data
def _cmd(run_dir, data, template):
p = cls.is_parallel(data)
if rv := template.write_parameters(data, run_dir=run_dir, is_parallel=p):
return rv
return [
pkinspect.root_package(template),
pkinspect.module_basename(template),
"run-background" if p else "run",
str(run_dir),
]
def _data(run_dir, data):
if rv := cls.sim_run_input(run_dir, checked=False):
return rv
if data:
cls.sim_run_input_to_run_dir(data, run_dir)
return data
raise FileNotFoundError(f"path={cls.sim_run_input_path(run_dir)}")
r = pkio.py_path(run_dir)
d = _data(r, data)
cls.support_files_to_run_dir(data=d, run_dir=r)
return _cmd(r, d, sirepo.template.import_module(cls.sim_type()))
[docs]
@classmethod
def sim_type(cls):
return cls._memoize(pkinspect.module_basename(cls))
[docs]
@classmethod
def support_files_to_run_dir(cls, data, run_dir):
cls.lib_files_to_run_dir(data, run_dir)
cls.sim_files_to_run_dir(data, run_dir)
[docs]
@classmethod
def update_model_defaults(cls, model, name, dynamic=None):
defaults = cls.model_defaults(name)
if dynamic:
defaults.update(dynamic(name))
for f in defaults:
if f not in model:
model[f] = defaults[f]
[docs]
@classmethod
def want_browser_frame_cache(cls, report):
return True
[docs]
@classmethod
def watchpoint_id(cls, report):
m = cls.WATCHPOINT_REPORT_RE.search(report)
if not m:
raise RuntimeError("invalid watchpoint report name: ", report)
return int(m.group(1))
@classmethod
def _assert_agent_side(cls):
if not cls._is_agent_side():
raise AssertionError(
f"method={pkinspect.caller_func_name()} only in job_agent"
)
@classmethod
def _assert_server_side(cls):
if cls._is_agent_side():
raise AssertionError(
f"method={pkinspect.caller_func_name()} not available in job_agent"
)
@classmethod
def _compute_model(cls, analysis_model, resp):
"""Returns ``animation`` for models with ``Animation`` in name
Subclasses should override, but call this. The mapping of
``<name>Animation`` to ``animation`` should stay consistent here.
Args:
model (str): analysis model
resp (PKDict): analysis model
Returns:
str: name of compute model for analysis_model
"""
if _ANIMATION_NAME in analysis_model.lower():
return _ANIMATION_NAME
return analysis_model
@classmethod
def _force_recompute(cls):
"""Random value to force a compute_job to recompute.
Used by `_compute_job_fields`
Returns:
str: random value same length as md5 hash
"""
return sirepo.util.random_base62(32)
@classmethod
def _frame_id_fields(cls, frame_args):
"""Schema specific frame_id fields"""
f = cls.schema().frameIdFields
r = frame_args.frameReport
return f[r] if r in f else f[cls.compute_model(r)]
@classmethod
def _frame_param_to_field(cls, param):
from json.decoder import JSONDecodeError
try:
return pkjson.load_any(param)
except JSONDecodeError:
return param
@classmethod
def _init_models(cls, models, names=None, dynamic=None):
if names:
names = set(list(names) + ["simulation"])
for n in names or cls.schema().model:
if n in _CLIENT_ONLY_MODELS:
continue
cls.update_model_defaults(
models.setdefault(n, PKDict()),
n,
dynamic=dynamic,
)
@classmethod
def _is_agent_side(cls):
from sirepo import sim_db_file
return cls._memoize(bool(sim_db_file.in_job_agent()))
@classmethod
def _lib_file_abspath(cls, basename, qcall):
"""Path in user lib directory for `basename`"""
return (
cls._simulation_db()
.simulation_lib_dir(cls.sim_type(), qcall=qcall)
.join(basename)
)
@classmethod
def _lib_file_abspath_or_exists(
cls,
basename,
qcall=None,
exists_only=False,
):
"""Absolute path of lib file
On agent downloads file unless `exists_only`.
For utilities (`cfg.lib_file_resource_only`) only checks
resources (not user library).
Args:
basename (str): name to find
qcall (quest.API): quest [None]
exists_only (bool): if ``True`` do not download [False]
Returns:
object: bool if `exists_only` else py.path
"""
if cls._is_agent_side():
if exists_only:
if cls.sim_db_client().exists(cls.LIB_DIR, basename):
return True
else:
try:
return cls._read_binary_and_save(
cls.LIB_DIR, basename, pkio.py_path()
)
except Exception as e:
if not pkio.exception_is_not_found(e):
raise
# try to find below
elif not _cfg.lib_file_resource_only:
# Command line utility or server
f = cls._lib_file_abspath(basename, qcall)
if f.check(file=True):
return exists_only or f
try:
# Lib file distributed with build
f = cls.lib_file_resource_path(basename)
if f.check(file=True):
return exists_only or f
except Exception as e:
if not pkio.exception_is_not_found(e):
raise
return False if exists_only else None
@classmethod
def _lib_file_list(cls, pat, want_user_lib_dir=True, qcall=None):
"""Unsorted list of absolute paths matching glob pat
Only works locally.
"""
cls._assert_server_side()
res = PKDict(
(
(f.basename, f)
for f in sirepo.resource.glob_paths(
_TEMPLATE_RESOURCE_DIR,
cls.sim_type(),
cls.LIB_DIR,
pat,
)
)
)
if want_user_lib_dir:
# lib_dir overwrites resource_dir
res.update(
(f.basename, f)
for f in pkio.sorted_glob(cls._lib_file_abspath(pat, qcall))
)
return res.values()
@classmethod
def _memoize(cls, value):
"""Cache class method (no args)
Example::
@classmethod
def something(cls):
return cls._memoize(compute_something_once())
Args:
value (object): any object
Returns:
object: value
"""
@classmethod
def wrap(cls):
return value
setattr(cls, pkinspect.caller_func_name(), wrap)
return value
@classmethod
def _non_analysis_fields(cls, data, model):
"""Get the non-analysis fields for model
If the model has "analysis" fields, then return the full list of non-style fields
otherwise returns the model name (which implies all model fields)
Args:
data (dict): simulation
model (str): name of model to compute
Returns:
list: compute_fields fields for model or whole model
"""
s = set(data.models.get(model, {}).keys()) - cls.ANALYSIS_ONLY_FIELDS
if not s:
return [model]
return sorted(["{}.{}".format(model, x) for x in s])
@classmethod
def _organize_example(cls, data):
dm = data.models
if dm.simulation.get("isExample") and dm.simulation.folder == "/":
dm.simulation.folder = "/Examples"
@classmethod
def _prepare_import_file_name_args(cls, req):
res = PKDict(basename=os.path.basename(req.filename))
res.purebasename, e = os.path.splitext(res.basename)
res.ext_lower = e.lower()
return res
@classmethod
def _proprietary_code_tarball(cls):
return f"{cls.sim_type()}.tar.gz"
@classmethod
def _read_binary_and_save(
cls, lib_sid_uri, basename, dst_dir, is_exe=False, sim_type=None
):
p = dst_dir.join(basename)
p.write_binary(
cls.sim_db_client().get(lib_sid_uri, basename, sim_type=sim_type)
)
if is_exe:
p.chmod(cls._EXE_PERMISSIONS)
return p
@classmethod
def _sim_file_basenames(cls, data):
return []
@classmethod
def _simulation_db(cls):
cls._assert_server_side()
from sirepo import simulation_db
return simulation_db
_cfg = pkconfig.init(
lib_file_resource_only=(False, bool, "used by utility programs"),
)