"""Common execution template.
:copyright: Copyright (c) 2015 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern import pkcompat
from pykern import pkio
from pykern import pkjinja
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp, pkdexc
from sirepo.template import code_variable
import math
import os
import re
import sirepo.const
import sirepo.sim_data
import sirepo.template
import sirepo.util
import subprocess
import sys
import types
DEFAULT_INTENSITY_DISTANCE = 20
#: Input json file
INPUT_BASE_NAME = sirepo.const.SIM_RUN_INPUT_BASENAME
#: Test if value is numeric text
NUMERIC_RE = re.compile(r"^\s*(\-|\+)?(\d+|(\d*(\.\d*)))([eE][+-]?\d+)?\s*$")
#: Output json file
OUTPUT_BASE_NAME = "out"
#: Python file (not all simulations)
PARAMETERS_PYTHON_FILE = "parameters.py"
#: stderr and stdout
RUN_LOG = "run.log"
_HISTOGRAM_BINS_MAX = 500
_PLOT_LINE_COLOR = [
"#1f77b4",
"#ff7f0e",
"#2ca02c",
"#d62728",
"#9467bd",
"#8c564b",
"#e377c2",
"#7f7f7f",
"#bcbd22",
"#17becf",
]
#: for JobCmdFile replies
_TEXT_SUFFIXES = (".py", ".txt", ".csv")
[docs]
class JobCmdFile(PKDict):
"""Returned by dispatched job commands
`analysis_job_dispatch`, `stateless_compute_dispatch`, and
`stateful_compute_dispatch` support file returns.
Args:
reply_content (object): what to send [reply_path.read()]
reply_path (py.path): py.path of file to read
reply_uri (str): what to call the file [reply_path.basename]
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.pksetdefault(error=None)
if self.error:
self.pksetdefault(state="error")
return
if not (self.get("reply_uri") or self.get("reply_path")):
raise AssertionError(
f"reply_uri or reply_path not in kwargs.keys={list(kwargs)}"
)
self.pksetdefault(
reply_content=lambda: (
pkcompat.to_bytes(pkio.read_text(self.reply_path))
if self.reply_path.ext in _TEXT_SUFFIXES
else self.reply_path.read_binary()
),
reply_uri=lambda: self.reply_path.basename,
state="ok",
)
[docs]
class LogParser(PKDict):
def __init__(self, run_dir, **kwargs):
super().__init__(run_dir=run_dir, **kwargs)
self.pksetdefault(
default_msg="An unknown error occurred",
error_patterns=(r"Error: (.*)",),
log_filename=RUN_LOG,
)
[docs]
def parse_for_errors(self):
p = self.run_dir.join(self.log_filename)
if not p.exists() or p.size() <= 0:
return ""
res = ""
e = set()
with pkio.open_text(p) as f:
for line in f:
if m := self._parse_log_line(line):
if m not in e:
res += m
e.add(m)
if res:
return res
return self.default_msg
def _parse_log_line(self, line):
res = ""
for pattern in self.error_patterns:
m = re.search(pattern, line)
if m:
res += m.group(1) + "\n"
return res
[docs]
class ModelUnits:
"""Convert model fields from native to sirepo format, or from sirepo to native format.
Examples::
def _xpas(value, is_native):
# custom field conversion code would go here
return value
mu = ModelUnits({
'CHANGREF': {
'XCE': 'cm_to_m',
'YCE': 'cm_to_m',
'ALE': 'deg_to_rad',
'XPAS': _xpas,
},
})
m = mu.scale_from_native('CHANGREF', {
'XCE': 2,
'YCE': 0,
'ALE': 8,
'XPAS': '#20|20|20',
})
assert m['XCE'] == 2e-2
assert ModelUnits.scale_value(2, 'cm_to_m', True) == 2e-2
assert ModelUnits.scale_value(0.02, 'cm_to_m', False) == 2
"""
# handler for common units, native --> sirepo scale
_COMMON_HANDLERS = PKDict(
cm_to_m=1e-2,
mm_to_cm=1e-1,
mrad_to_rad=1e-3,
deg_to_rad=math.pi / 180,
)
def __init__(self, unit_def):
"""
Args:
unit_def (dict):
Map of model name to field handlers
"""
self.unit_def = unit_def
[docs]
def scale_from_native(self, name, model):
"""Scale values from native values into sirepo units."""
return self.__scale_model(name, model, True)
[docs]
def scale_to_native(self, name, model):
"""Scale values from sirepo units to native values."""
return self.__scale_model(name, model, False)
[docs]
@classmethod
def scale_value(cls, value, scale_type, is_native):
"""Scale one value using the specified handler."""
handler = cls._COMMON_HANDLERS.get(scale_type, scale_type)
if isinstance(handler, float):
return float(value) * (handler if is_native else 1 / handler)
assert isinstance(handler, types.FunctionType), "Unknown unit scale: {}".format(
handler
)
return handler(value, is_native)
def __scale_model(self, name, model, is_native):
if name in self.unit_def:
for field in self.unit_def[name]:
if field not in model:
continue
model[field] = self.scale_value(
model[field], self.unit_def[name][field], is_native
)
return model
class _MPILogParser(LogParser):
def parse_for_errors(self):
p = self.run_dir.join(self.log_filename)
e = None
if p.exists():
m = re.search(
r"^Traceback .*?^\w*Error: (.*?)\n",
pkio.read_text(p),
re.MULTILINE | re.DOTALL,
)
if m:
e = m.group(1)
return e
[docs]
class NamelistParser:
[docs]
def parse_text(self, text):
import f90nml
text = str(text.encode("ascii", "ignore"), "UTF-8")
parser = f90nml.Parser()
parser.global_start_index = 1
return parser.reads(text)
[docs]
class NoH5PathError(KeyError):
"""The given path into an h5 file does not exist"""
pass
[docs]
class ParticleEnergy:
"""Computes the energy related fields for a particle from one field.
Units:
mass [GeV/c^2]
pc [GeV/c]
energy [GeV]
brho [Tm]
"""
SPEED_OF_LIGHT = 299792458 # [m/s]
ENERGY_PRIORITY = PKDict(
impactx=["energy"],
opal=["gamma", "energy", "pc"],
madx=["energy", "pc", "gamma", "beta", "brho"],
)
# defaults unless constants.particleMassAndCharge is set in the schema
PARTICLE_MASS_AND_CHARGE = PKDict(
# mass [GeV]
antiproton=[0.938272046, -1],
electron=[5.10998928e-4, -1],
muon=[0.1056583755, -1],
positron=[5.10998928e-4, 1],
proton=[0.938272046, 1],
)
[docs]
@classmethod
def compute_energy(cls, sim_type, particle, energy):
p = PKDict(
mass=cls.get_mass(sim_type, particle, energy),
charge=cls.get_charge(sim_type, particle, energy),
)
for f in cls.ENERGY_PRIORITY[sim_type]:
if f in energy and energy[f] != 0:
v = energy[f]
handler = "_ParticleEnergy__set_from_{}".format(f)
getattr(cls, handler)(p, energy)
energy[f] = v
return energy
assert False, "missing energy field: {}".format(energy)
[docs]
@classmethod
def get_charge(cls, sim_type, particle, beam):
return cls.__particle_info(sim_type, particle, beam)[1]
[docs]
@classmethod
def get_mass(cls, sim_type, particle, beam):
return cls.__particle_info(sim_type, particle, beam)[0]
@classmethod
def __particle_info(cls, sim_type, particle, beam):
mass_and_charge = (
sirepo.sim_data.get_class(sim_type)
.schema()
.constants.get(
"particleMassAndCharge",
cls.PARTICLE_MASS_AND_CHARGE,
)
)
if particle in mass_and_charge:
return mass_and_charge[particle]
return [beam.mass, beam.charge]
@classmethod
def __set_from_beta(cls, particle, energy):
assert (
energy.beta >= 0 or energy.beta < 1
), "energy beta out of range: {}".format(energy.beta)
energy.gamma = 1 / math.sqrt(1 - energy.beta**2)
cls.__set_from_gamma(particle, energy)
@classmethod
def __set_from_brho(cls, particle, energy):
energy.pc = energy.brho * abs(particle.charge) * cls.SPEED_OF_LIGHT * 1e-9
cls.__set_from_pc(particle, energy)
@classmethod
def __set_from_energy(cls, particle, energy):
energy.gamma = energy.energy / particle.mass
cls.__set_from_gamma(particle, energy)
@classmethod
def __set_from_gamma(cls, particle, energy):
assert energy.gamma >= 1, "energy gamma out of range: {}".format(energy.gamma)
energy.energy = energy.gamma * particle.mass
energy.kinetic_energy = energy.energy - particle.mass
energy.beta = math.sqrt(1.0 - 1.0 / (energy.gamma**2))
energy.pc = energy.gamma * energy.beta * particle.mass
energy.brho = energy.pc / (abs(particle.charge) * cls.SPEED_OF_LIGHT * 1e-9)
@classmethod
def __set_from_pc(cls, particle, energy):
r2 = energy.pc**2 / (particle.mass**2)
energy.beta = math.sqrt(r2 / (1 + r2))
cls.__set_from_beta(particle, energy)
[docs]
def analysis_job_dispatch(data, **kwargs):
t = sirepo.template.import_module(data.simulationType)
return getattr(t, f"analysis_job_{_validate_method(t, data)}")(data, **kwargs)
[docs]
def compute_field_range(args, compute_range, run_dir):
"""Computes the fieldRange values for all parameters across all animation files.
Caches the value on the animation input file. compute_range() is called to
read the simulation specific datafiles and extract the ranges by field.
"""
from sirepo import simulation_db
data = simulation_db.read_json(run_dir.join(INPUT_BASE_NAME))
res = None
n = args.modelName
if n in data.models:
if "fieldRange" in data.models[n]:
res = data.models[n].fieldRange
else:
res = compute_range(run_dir)
data.models[n].fieldRange = res
simulation_db.write_json(run_dir.join(INPUT_BASE_NAME), data)
return PKDict(fieldRange=res)
[docs]
def compute_plot_color_and_range(plots, plot_colors=None, fixed_y_range=None):
"""For parameter plots, assign each plot a color and compute the full y_range.
If a fixed range is provided, use that instead
"""
y_range = fixed_y_range
colors = plot_colors if plot_colors is not None else _PLOT_LINE_COLOR
for i in range(len(plots)):
plot = plots[i]
plot["color"] = colors[i % len(colors)]
if not plot["points"]:
y_range = [0, 0]
elif fixed_y_range is None:
vmin = min(plot["points"])
vmax = max(plot["points"])
if y_range:
if vmin < y_range[0]:
y_range[0] = vmin
if vmax > y_range[1]:
y_range[1] = vmax
else:
y_range = [vmin, vmax]
return y_range
[docs]
def write_dict_to_h5(d, file_path, h5_path=None):
"""Store the contents of a dict in an h5 file starting at the provided path.
Stores the data recursively so that
{a: A, b: {c: C, d: D}}
maps the data to paths
<h5_path>/a -> A
<h5_path>/b/c -> C
<h5_path>/b/d -> D
h5_to_dict() performs the reverse process
"""
import h5py
if h5_path is None:
h5_path = ""
try:
for i in range(len(d)):
p = f"{h5_path}/{i}"
try:
with h5py.File(file_path, "a") as f:
f.create_dataset(p, data=d[i])
except TypeError:
write_dict_to_h5(d[i], file_path, h5_path=p)
except KeyError:
for k in d:
p = f"{h5_path}/{k}"
try:
with h5py.File(file_path, "a") as f:
f.create_dataset(p, data=d[k])
except TypeError:
write_dict_to_h5(d[k], file_path, h5_path=p)
[docs]
def enum_text(schema, name, value):
for e in schema["enum"][name]:
if e[0] == str(value):
return e[1]
assert False, "unknown {} enum value: {}".format(name, value)
[docs]
def exec_parameters(path=None):
from pykern import pkrunpy
return pkrunpy.run_path_as_module(path or PARAMETERS_PYTHON_FILE)
[docs]
def exec_parameters_with_mpi():
from sirepo import mpi
return mpi.run_script(pkio.read_text(PARAMETERS_PYTHON_FILE))
[docs]
def file_extension_ok(file_path, white_list=None, black_list=["py", "pyc"]):
"""Determine whether a file has an acceptable extension
Args:
file_path (str): name of the file to examine
white_list ([str]): list of file types allowed (defaults to empty list)
black_list ([str]): list of file types rejected (defaults to
['py', 'pyc']). Ignored if white_list is not empty
Returns:
If file is a directory: True
If white_list non-empty: True if the file's extension matches any in
the list, otherwise False
If white_list is empty: False if the file's extension matches any in
black_list, otherwise True
"""
if os.path.isdir(file_path):
return True
if white_list:
in_list = False
for ext in white_list:
in_list = in_list or pkio.has_file_extension(file_path, ext)
if not in_list:
return False
return True
for ext in black_list:
if pkio.has_file_extension(file_path, ext):
return False
return True
[docs]
def flatten_data(d, res, prefix=""):
"""Takes a nested dictionary and converts it to a single level dictionary with
flattened keys."""
for k in d:
v = d[k]
if isinstance(v, dict):
flatten_data(v, res, prefix + k + "_")
elif isinstance(v, list):
pass
else:
res[prefix + k] = v
return res
[docs]
def generate_parameters_file(data, is_run_mpi=False):
from sirepo import mpi
v = flatten_data(data["models"], PKDict())
v.notes = _get_notes(v)
v.mpi = mpi.abort_on_signal_code() if is_run_mpi else ""
return render_jinja(None, v, name="common-header.py"), v
[docs]
def get_exec_parameters_cmd(is_mpi=False):
from sirepo import mpi
return mpi.get_cmd() if is_mpi else (sys.executable, PARAMETERS_PYTHON_FILE)
[docs]
def h5_to_dict(hf, path=None):
d = PKDict()
if path is None:
path = "/"
try:
for k in hf[path]:
try:
d[k] = hf[path][k][()].tolist()
except (AttributeError, TypeError):
# AttributeErrors occur when invoking tolist() on non-arrays
# TypeErrors occur when accessing a group with [()]
# in each case we recurse one step deeper into the path
p = "{}/{}".format(path, k)
d[k] = h5_to_dict(hf, path=p)
except TypeError:
# this TypeError occurs when hf[path] is not iterable (e.g. a string)
# assume this is a single-valued entry and run it through pkcompat
return pkcompat.from_bytes(hf[path][()])
except KeyError as e:
# no such path into the h5 file - re-raise so we know where it came from
raise NoH5PathError(e)
# replace dicts with arrays on a 2nd pass
try:
indices = [int(k) for k in d.keys()]
d_arr = [None] * len(indices)
for i in indices:
d_arr[i] = d[str(i)]
d = d_arr
except IndexError:
# integer keys but not an array
pass
except ValueError:
# keys not all integers, we're done
pass
return d
[docs]
def heatmap(values, model, plot_fields=None, weights=None):
"""Computes a report histogram (x_range, y_range, z_matrix) for a report model."""
import numpy
r = None
if "plotRangeType" in model:
if model["plotRangeType"] == "fixed":
r = [
_plot_range(model, "horizontal"),
_plot_range(model, "vertical"),
]
elif model["plotRangeType"] == "fit" and "fieldRange" in model:
r = [
model.fieldRange[model["x"]],
model.fieldRange[model["y"]],
]
hist, edges = numpy.histogramdd(
values,
histogram_bins(model["histogramBins"]),
weights=weights,
range=r,
)
res = PKDict(
x_range=[float(edges[0][0]), float(edges[0][-1]), len(hist)],
y_range=[float(edges[1][0]), float(edges[1][-1]), len(hist[0])],
z_matrix=hist.T.tolist(),
)
if plot_fields:
res.update(plot_fields)
return res
[docs]
def histogram_bins(nbins):
"""Ensure the histogram count is in a valid range"""
nbins = int(nbins)
if nbins <= 0:
nbins = 1
elif nbins > _HISTOGRAM_BINS_MAX:
nbins = _HISTOGRAM_BINS_MAX
return nbins
[docs]
def jinja_filename(filename):
# append .jinja, because file may already have an extension
return filename + ".jinja"
[docs]
def model_from_frame_args(frame_args):
if frame_args.frameReport in frame_args.sim_in.models:
res = frame_args.sim_in.models[frame_args.frameReport]
res.update(frame_args)
return res
return frame_args
[docs]
def parameter_plot(x, plots, model, plot_fields=None, plot_colors=None):
res = PKDict(
x_points=x,
x_range=[min(x), max(x)] if x else [0, 0],
plots=plots,
y_range=compute_plot_color_and_range(plots, plot_colors),
)
if "plotRangeType" in model:
if model.plotRangeType == "fixed":
res["x_range"] = _plot_range(model, "horizontal")
res["y_range"] = _plot_range(model, "vertical")
elif model.plotRangeType == "fit" and "fieldRange" in model:
res["x_range"] = model.fieldRange[model.x]
for i in range(len(plots)):
r = model.fieldRange[plots[i]["field"]]
if r[0] < res["y_range"][0]:
res["y_range"][0] = r[0]
if r[1] > res["y_range"][1]:
res["y_range"][1] = r[1]
if plot_fields:
res.update(plot_fields)
return res
[docs]
def parse_enums(enum_schema):
"""Returns a list of enum values, keyed by enum name."""
res = PKDict()
for k in enum_schema:
res[k] = PKDict()
for v in enum_schema[k]:
res[k][v[0]] = True
return res
[docs]
def parse_mpi_log(run_dir):
return _MPILogParser(run_dir, log_filename=sirepo.const.MPI_LOG).parse_for_errors()
[docs]
def read_dict_from_h5(file_path, h5_path=None):
import h5py
with h5py.File(file_path, "r") as f:
return h5_to_dict(f, path=h5_path)
[docs]
def read_last_csv_line(path):
# for performance, don't read whole file if only last line is needed
if not path.exists():
return ""
try:
with open(str(path), "rb") as f:
f.readline()
f.seek(-2, os.SEEK_END)
while f.read(1) != b"\n":
f.seek(-2, os.SEEK_CUR)
return pkcompat.from_bytes(f.readline())
except IOError:
return ""
[docs]
def read_sequential_result(run_dir):
"""Read result data file from simulation
Args:
run_dir (py.path): where to find output
Returns:
dict: result
"""
from sirepo import simulation_db
return simulation_db.read_json(
simulation_db.json_filename(OUTPUT_BASE_NAME, run_dir),
)
[docs]
def render_jinja(sim_type, v, name=PARAMETERS_PYTHON_FILE, jinja_env=None):
"""Render the values into a jinja template.
Args:
sim_type (str): application name
v: flattened model data
Returns:
str: source text
"""
b = jinja_filename(name)
return pkjinja.render_file(
(
sirepo.sim_data.get_class(sim_type).resource_path(b)
if sim_type
else sirepo.sim_data.resource_path(b)
),
v,
jinja_env=jinja_env,
)
[docs]
async def sim_frame(frame_id, op, qcall):
f, s = sirepo.sim_data.parse_frame_id(frame_id)
# document parsing the request
qcall.parse_post(req_data=f, id=True, check_sim_exists=True)
try:
x = await op(f)
except Exception as e:
if isinstance(e, sirepo.util.ReplyExc):
return e
raise sirepo.util.UserAlert(
"Report not generated",
"exception={} str={} stack={}",
type(e),
e,
pkdexc(),
)
r = qcall.reply_dict(x)
if "error" not in x and s.want_browser_frame_cache(s.frameReport):
return qcall.headers_for_cache(r)
return qcall.headers_for_no_cache(r)
[docs]
def sim_frame_dispatch(frame_args):
from sirepo import simulation_db
frame_args.pksetdefault(
run_dir=lambda: simulation_db.simulation_run_dir(frame_args),
).pksetdefault(
sim_in=lambda: simulation_db.read_json(
frame_args.run_dir.join(INPUT_BASE_NAME),
),
)
t = sirepo.template.import_module(frame_args.simulationType)
o = getattr(t, "sim_frame_" + frame_args.frameReport, None) or getattr(
t, "sim_frame"
)
res = o(frame_args)
if res is None:
raise RuntimeError(
"unsupported simulation_frame model={}".format(frame_args.frameReport)
)
return res
[docs]
def stateful_compute_dispatch(data, **kwargs):
t = sirepo.template.import_module(data.simulationType)
m = _validate_method(t, data)
k = PKDict(data=data)
# TODO(robnagler) polymorphism needed; templates should be classes
if re.search(r"(?:^rpn|_rpn)_", m):
k.schema = getattr(t, "SCHEMA")
t = getattr(t, "code_var")(data.variables)
return getattr(t, f"stateful_compute_{m}")(**k, **kwargs)
[docs]
def stateless_compute_dispatch(data, **kwargs):
t = sirepo.template.import_module(data.simulationType)
return getattr(
t,
f"stateless_compute_{_validate_method(t, data)}",
)(data, **kwargs)
[docs]
def subprocess_output(cmd, env=None):
"""Run cmd and return output or None, logging errors.
Args:
cmd (list): what to run
Returns:
str: output is None on error else a stripped string
"""
err = None
out = None
try:
p = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = p.communicate()
if p.wait() != 0:
raise subprocess.CalledProcessError(returncode=p.returncode, cmd=cmd)
except subprocess.CalledProcessError as e:
pkdlog("{}: exit={} err={}", cmd, e.returncode, err)
return None
if out:
out = pkcompat.from_bytes(out)
return out.strip()
return ""
[docs]
def text_data_file(filename, run_dir):
"""Return a datafile with a .txt extension so the text/plain mimetype is used."""
return JobCmdFile(
reply_path=run_dir.join(filename, abs=1),
reply_uri=filename + ".txt",
)
[docs]
def validate_model(model_data, model_schema, enum_info):
"""Ensure the value is valid for the field type. Scales values as needed."""
for k in model_schema:
label = model_schema[k][0]
field_type = model_schema[k][1]
if k in model_data:
value = model_data[k]
elif len(model_schema[k]) > 2:
value = model_schema[k][2]
else:
raise Exception(
'no value for field "{}" and no default value in schema'.format(k)
)
if field_type in enum_info:
if str(value) not in enum_info[field_type]:
# Check a comma-delimited string against the enumeration
for item in re.split(r"\s*,\s*", str(value)):
if item not in enum_info[field_type]:
assert (
item in enum_info[field_type]
), '{}: invalid enum "{}" value for field "{}"'.format(
item, field_type, k
)
elif field_type == "Float":
if not value:
value = 0
v = float(value)
if re.search(r"\[m(m|rad)]", label):
v /= 1000
elif re.search(r"\[n(m|rad)]", label) or re.search(r"\[nm/pixel\]", label):
v /= 1e09
elif re.search(r"\[ps]", label):
v /= 1e12
# TODO(pjm): need to handle unicode in label better (mu)
elif re.search("\\[\xb5(m|rad)]", label) or re.search(r"\[mm-mrad]", label):
v /= 1e6
model_data[k] = float(v)
elif field_type == "Integer":
if not value:
value = 0
model_data[k] = int(value)
elif value is None:
# value is already None, do not convert
pass
else:
model_data[k] = _escape(value)
[docs]
def validate_models(model_data, model_schema):
"""Validate top-level models in the schema. Returns enum_info."""
enum_info = parse_enums(model_schema["enum"])
for k in model_data["models"]:
if k in model_schema["model"]:
validate_model(
model_data["models"][k],
model_schema["model"][k],
enum_info,
)
if "beamline" in model_data["models"]:
for m in model_data["models"]["beamline"]:
validate_model(m, model_schema["model"][m["type"]], enum_info)
return enum_info
[docs]
def write_sequential_result(result, run_dir=None):
"""Write the results of a sequential simulation to disk.
Args:
result (dict): The results of the simulation
run_dir (py.path): Defaults to current dir
"""
from sirepo import simulation_db
if not run_dir:
run_dir = pkio.py_path()
f = simulation_db.json_filename(OUTPUT_BASE_NAME, run_dir)
assert not f.exists(), "{} file exists".format(OUTPUT_BASE_NAME)
simulation_db.write_json(f, result)
t = sirepo.template.import_module(
simulation_db.read_json(
simulation_db.json_filename(
INPUT_BASE_NAME,
run_dir,
),
),
)
if hasattr(t, "clean_run_dir"):
t.clean_run_dir(run_dir)
def _escape(v):
return re.sub(r"([^\\])[\"\']", r"\1", str(v))
def _get_notes(data):
notes = []
for key in data.keys():
match = re.search(r"^(.+)_notes$", key)
if match and data[key]:
n_key = match.group(1)
k = n_key[0].capitalize() + n_key[1:]
k_words = [word for word in re.split(r"([A-Z][a-z]*)", k) if word != ""]
notes.append((" ".join(k_words), data[key]))
return sorted(notes, key=lambda n: n[0])
def _plot_range(report, axis):
half_size = float(report["{}Size".format(axis)]) / 2.0
midpoint = float(report["{}Offset".format(axis)])
return [midpoint - half_size, midpoint + half_size]
def _validate_method(template, data):
m = data.method
assert re.search(
r"^[a-z]\w{1,34}$",
m,
flags=re.IGNORECASE,
), f"method={m} invalid compute or analysis function"
return m