# -*- coding: utf-8 -*-
"""Flash Config parser.
:copyright: Copyright (c) 2021 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern import pkio
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdp, pkdlog
from sirepo.template import template_common
from sirepo.template import flash_views
import os.path
import re
[docs]
class ConfigParser:
# D <name> <comment> or D & <comment>
# DATAFILES <wildcard>
# LINKIF <filename> <unitename>
# MASS_SCALAR <name> (EOSMAP: <eosrole> | (EOSMAPIN: <eosrole>)? (EOSMAPOUT: <eosrole>)?)?
# PARAMETER <name> <type> CONSTANT? <default> <range spec>?
# PARTICLEMAP TO <partname> FROM <vartype> <varname>
# PARTICLEPROP <name> <type>
# PARTICLETYPE <particletype> INITMETHOD <initmethod> MAPMETHOD <mapmethod> ADVMETHOD <advmethod>
# PPDEFINE <sym> <val>?
# REQUESTS <unit>
# REQUIRES <unit>
# SCRATCHCENTERVAR <name>
# SPECIES <name> (TO <number of ions>)?
# USESETUPVARS <var>
# VARIABLE <name> (TYPE: <vartype)
# (IF, ELSEIF, ELSE, ENDIF)
[docs]
def parse(self, config_text):
idx = 1
self.stack = [PKDict(statements=[])]
for line in config_text.split("\n"):
line = re.sub(r"#.*$", "", line)
line = re.sub(r"(TYPE:)(\S)", r"\1 \2", line)
p = line.split()
if not p:
continue
method = f"_parse_{p[0].lower()}"
if not hasattr(self, method):
pkdlog("skipping line={}", line)
continue
m = PKDict(
_id=idx,
_type=p[0],
)
idx += 1
item = getattr(self, method)(p, m)
if item:
self.stack[-1].statements.append(item)
assert len(self.stack) == 1, "improper IF/ENDIF nesting"
return self.__move_descriptions_to_parameters(self.stack[0])
def _parse_d(self, parts, model):
if parts[1] == "&":
prev = self.stack[-1].statements[-1]
assert prev._type == "D", "expected multiline description for D &"
prev.comment += " {}".format(" ".join(parts[2:]))
return None
return model.pkupdate(
name=parts[1],
comment=" ".join(parts[2:]),
)
def _parse_datafiles(self, parts, model):
return model.pkupdate(
wildcard=parts[1],
)
def _parse_else(self, parts, model):
self.stack.pop()
self.__new_stack(model)
return None
def _parse_elseif(self, parts, model):
self.stack.pop()
return self._parse_if(parts, model)
def _parse_endif(self, parts, model):
self.stack.pop()
return model
def _parse_if(self, parts, model):
self.__new_stack(model, parts[1])
return None
def _parse_linkif(self, parts, model):
return model.pkupdate(
filename=parts[1],
unitname=parts[2],
)
def _parse_mass_scalar(self, parts, model):
for i in range(2, len(parts), 2):
n = parts[i]
m = re.search(r"^(EOSMAP(IN|OUT)?):$", n)
assert m, f"unknown MASS_SCALAR arg: {n}"
model[m.group(1).lower()] = parts[i + 1]
return model.pkupdate(
name=parts[1],
)
def _parse_parameter(self, parts, model):
assert re.search(
r"^(REAL|INTEGER|STRING|BOOLEAN)$", parts[2]
), f"invalid Config type: {parts[2]}"
if parts[3] == "CONSTANT":
del parts[3]
model.isConstant = "1"
else:
model.isConstant = "0"
model.range = ""
if len(parts) > 4:
model.range = re.sub(r"\[|\]", "", " ".join(parts[4:]))
return model.pkupdate(
name=parts[1],
type=parts[2],
default=re.sub(r'"', "", parts[3]),
)
def _parse_particlemap(self, parts, model):
assert (
parts[1] == "TO" and parts[3] == "FROM"
), f'invalid PARTICLEMAP def: {" ".join(parts)}'
return model.pkupdate(
partname=parts[2],
pvartype=parts[4],
varname=parts[5],
)
def _parse_particleprop(self, parts, model):
return model.pkupdate(
name=parts[1],
type=parts[2],
)
def _parse_particletype(self, parts, model):
assert (
parts[2] == "INITMETHOD" and parts[4] == "MAPMETHOD"
), f'invalid PARTICLETYPE def: {" ".join(parts)}'
return model.pkupdate(
particletype=parts[1],
initmethod=parts[3],
mapmethod=parts[5],
advmethod=parts[7] if len(parts) > 6 and parts[6] == "ADVMETHOD" else "",
)
def _parse_ppdefine(self, parts, model):
return model.pkupdate(
sym=parts[1],
val=" ".join(parts[2:]) if len(parts) > 2 else "",
)
def _parse_requires(self, parts, model):
return model.pkupdate(
unit=parts[1],
)
def _parse_requests(self, parts, model):
return model.pkupdate(
unit=parts[1],
)
def _parse_scratchcentervar(self, parts, model):
return model.pkupdate(
name=parts[1],
)
def _parse_species(self, parts, model):
if len(parts) > 2:
assert parts[2] == "TO", f'invalid SPECIES: {" ".join(parts)}'
model.numberOfIons = parts[3]
return model.pkupdate(
name=parts[1],
)
def _parse_usesetupvars(self, parts, model):
return model.pkupdate(vars=" ".join(parts[1:]))
def _parse_variable(self, parts, model):
model.vartype = ""
if len(parts) > 2:
assert parts[2] == "TYPE:", f'invalid VARIABLE line: {" ".join(parts)}'
model.vartype = parts[3]
return model.pkupdate(
name=parts[1],
)
def __move_descriptions_to_parameters(self, item):
def _find_all(item, search_type, do_remove=False, res=None):
if res is None:
res = PKDict()
if "_type" in item and item._type == search_type:
res[item.name] = item
if "statements" in item:
for stmt in item.statements:
_find_all(stmt, search_type, do_remove, res)
if do_remove:
item.statements = list(
filter(lambda x: x._type != search_type, item.statements)
)
return res
descriptions = _find_all(item, "D", do_remove=True)
parameters = _find_all(item, "PARAMETER")
for name in descriptions:
if name in parameters:
parameters[name].comment = descriptions[name].comment
return item.statements
def __new_stack(self, model, condition=None):
self.stack[-1].statements.append(
model.pkupdate(
statements=[],
)
)
if condition:
model.condition = condition
self.stack.append(model)
[docs]
class ParameterParser:
[docs]
def parse(self, sim_in, par_text):
self.schema = sim_in.models.flashSchema
self.field_map = self.__field_to_model_map()
return self.__parse_values(self.__parse_text(par_text))
def __field_to_model_map(self):
res = PKDict()
for m in self.schema.model:
for f in self.schema.model[m]:
res[f.lower()] = [m, f]
return res
def __parse_text(self, par_text):
res = PKDict()
for line in par_text.split("\n"):
line = re.sub(r"#.*$", "", line)
m = re.search(r"^(\w.*?)\s*=\s*(.*?)\s*$", line)
if m:
f, v = m.group(1, 2)
res[f.lower()] = v
return res
def __parse_values(self, fields):
enum_map = PKDict()
for e, items in self.schema.enum.items():
enum_map[e] = PKDict({v[0].lower(): v[0] for v in items})
res = PKDict()
for f, v in fields.items():
if f not in self.field_map:
pkdlog(f"Unknown field: {f}: {v}")
continue
m, fn = self.field_map[f]
ftype = self.schema.model[m][fn][1]
if ftype in self.schema.enum:
v = re.sub(r'"', "", v).strip()
assert (
v.lower() in enum_map[ftype]
), f"Unknown enum value for field: {ftype}: {v} values: {enum_map[ftype].keys()}"
res[fn] = enum_map[ftype][v.lower()]
elif ftype == "Boolean":
v = SetupParameterParser.remove_quotes(v)
m = re.search(r"^\.(true|false)\.$", v, re.IGNORECASE)
assert m, f"invalid boolean for field {f}: {v}"
res[fn] = "1" if m.group(1).lower() == "true" else "0"
else:
res[fn] = SetupParameterParser.parse_string_or_number(
ftype, fields[f], maybe_quoted=True
)
return res
[docs]
class SetupParameterParser:
_HUGE = PKDict(
Integer=2147483647,
Float=3.40282347e38,
)
_MAX_VAR_COUNT = 20
_SPECIAL_TYPES = PKDict(
Grid_GridMain=PKDict(
{
k: "GridBoundaryType"
for k in (
"xl_boundary_type",
"xr_boundary_type",
"yl_boundary_type",
"yr_boundary_type",
"zl_boundary_type",
"zr_boundary_type",
)
}
),
Grid_GridMain_paramesh=PKDict(
{
f"refine_var_{v}": "VariableNameOptional"
for v in range(1, _MAX_VAR_COUNT)
}
),
IO_IOMain=PKDict(
{f"plot_var_{v}": "VariableNameOptional" for v in range(1, _MAX_VAR_COUNT)}
),
physics_Diffuse_DiffuseMain=PKDict(
{
k: "DiffuseBoundaryType"
for k in (
"diff_eleXlBoundaryType",
"diff_eleXrBoundaryType",
"diff_eleYlBoundaryType",
"diff_eleYrBoundaryType",
"diff_eleZlBoundaryType",
"diff_eleZrBoundaryType",
)
}
),
physics_Gravity=PKDict(
grav_boundary_type="GravityBoundaryType",
),
physics_Gravity_GravityMain_Constant=PKDict(
gdirec="GravityDirection",
),
physics_Hydro_HydroMain_unsplit=PKDict(RiemannSolver="RiemannSolver"),
physics_RadTrans_RadTransMain_MGD=PKDict(
{
k: "RadTransMGDBoundaryType"
for k in (
"rt_mgdXlBoundaryType",
"rt_mgdXrBoundaryType",
"rt_mgdYlBoundaryType",
"rt_mgdYrBoundaryType",
"rt_mgdZlBoundaryType",
"rt_mgdZrBoundaryType",
)
}
),
physics_sourceTerms_EnergyDeposition_EnergyDepositionMain_Laser=PKDict(
{
f"ed_crossSectionFunctionType_{v}": "LaserCrossSectionOptional"
for v in range(1, _MAX_VAR_COUNT)
}
),
)
_TINY = PKDict(
Float=1.17549435e-38,
)
_TYPE_MAP = PKDict(
BOOLEAN="Boolean",
INTEGER="Integer",
REAL="Float",
STRING="String",
)
def __add_enum(self, enums, name, values):
enums[name] = [[v, v] for v in values]
def __init__(self, setup_dir):
self.setup_dir = setup_dir
[docs]
def generate_schema(self):
with pkio.open_text(self.setup_dir.join("setup_vars")) as f:
self.var_names = self.__parse_vars(f)
with pkio.open_text(self.setup_dir.join("setup_params")) as f:
self.models, self.views = self.__parse_setup(f)
with pkio.open_text(self.setup_dir.join("setup_datafiles")) as f:
self.datafiles = self.__parse_datafiles(f)
return self.__format_schema()
[docs]
@classmethod
def model_name_from_flash_unit_name(cls, text):
# TODO(e-carlin): discuss with pjm. Main units have different vars than the non-main
# return '_'.join(filter(lambda x: not re.search(r'Main$', x), text.split('/')))
return "_".join(text.split("/"))
[docs]
@classmethod
def parse_string_or_number(cls, field_type, value, maybe_quoted=False):
if field_type == "String" or field_type == "OptionalString":
return cls.__parse_string(value)
if maybe_quoted:
value = cls.remove_quotes(value)
if re.search(r"^(-)?(HUGE|TINY)", value):
return cls.__parse_special_number(field_type, value)
if field_type == "Integer":
assert re.search(
r"^([\-|+])?\d+$", str(value)
), f"{field.name} invalid flash integer: {value}"
return int(value)
if field_type == "Float":
value = re.sub(r"\+$", "", value)
assert template_common.NUMERIC_RE.search(
value
), f"invalid flash float: {value}"
return float(value)
if field_type == "Constant":
return value
assert False, f"unknown field type: {field_type}, value: {value}"
[docs]
@classmethod
def remove_quotes(cls, value):
# any value may be quoted
return re.sub(r'^"(.*)"$', r"\1", value)
def __create_views(self, schema):
schema.view = self.views
for m in schema.model:
schema.view[m].pkupdate(
fieldsPerTab=8,
advanced=[v for v in schema.model[m].keys()],
basic=[],
)
def __field_default(self, field):
if field.is_constant:
assert field.default, f"missing constant value: {field.name}"
return field.default
return self.__value_for_type(field, field.default)
def __field_type(self, field, model_name, enums):
assert field.type in self._TYPE_MAP, f"unknown field type: {field.type}"
field.type = self._TYPE_MAP[field.type]
if field.is_constant:
field.type = "Constant"
return
if self.__is_file_field(field):
field.type = "SetupDatafilesOptional"
if field.default == '"-none-"' or field.default == '"NOT SPECIFIED"':
field.default = "none"
field.enum = [v[0] for v in enums[field.type]]
return
if model_name in self._SPECIAL_TYPES:
if field.name in self._SPECIAL_TYPES[model_name]:
ftype = self._SPECIAL_TYPES[model_name][field.name]
field.type = ftype
field.enum = [v[0] for v in enums[ftype]]
return
if "valid_values" in field:
self.__valid_values(field)
if "enum" in field:
enum_name = f"{model_name}{field.name}"
assert enum_name not in enums, f"duplicate enum: {enum_name}"
self.__add_enum(enums, enum_name, field.enum)
field.type = enum_name
elif field.type == "String" and field.default == '""':
field.type = "OptionalString"
def __format_schema(self):
res = self.__init_schema()
for name in sorted(self.models.keys()):
m = self.models[name]
fields = PKDict()
for fname in sorted(m):
field = m[fname]
# [label, type, default, description, min, max]
self.__field_type(field, name, res.enum)
fields[fname] = [fname, field.type, self.__field_default(field)]
if field.description:
fields[fname].append(field.description)
if "min" in field:
if not field.description:
fields[fname].append("")
if field.min is None:
fields[fname].append(None)
else:
fields[fname].append(self.__value_for_type(field, field.min))
if "max" in field:
fields[fname].append(self.__value_for_type(field, field.max))
res.model[name] = fields
self.__create_views(res)
return flash_views.SpecializedViews().update_schema(res)
def __init_schema(self):
enums = PKDict()
for name, values in PKDict(
DiffuseBoundaryType=["dirichlet", "neumann", "outflow", "zero-gradient"],
GridBoundaryType=[
"nocurrent",
"reflect",
"axisymmetric",
"eqtsymmetric",
"outflow",
"diode",
"extrapolate",
"neumann_ins",
"dirichlet",
"hydrostatic-f2+nvout",
"hydrostatic-f2+nvdiode",
"hydrostatic-f2+nvrefl",
"hydrostatic+nvout",
"hydrostatic+nvdiode",
"hydrostatic+nvrefl",
"periodic",
"user",
],
GravityBoundaryType=["dirichlet", "isolated", "periodic"],
GravityDirection=["x", "y", "z"],
LaserCrossSectionOptional=["none", "gaussian1D", "gaussian2D", "uniform"],
RadTransMGDBoundaryType=[
"dirichlet",
"neumann",
"outflow",
"outstream",
"reflecting",
"vacuum",
],
RiemannSolver=[
"Roe",
"HLL",
"HLLC",
"Marquina",
"MarquinaModified",
"Hybrid",
"HLLD",
],
SetupDatafiles=self.datafiles,
SetupDatafilesOptional=["none", *sorted(self.datafiles)],
VariableName=sorted(self.var_names),
VariableNameOptional=["none", *sorted(self.var_names)],
).items():
self.__add_enum(enums, name, values)
return PKDict(
enum=enums,
model=PKDict(),
)
def __is_file_field(self, field):
# TODO(pjm): there may be other cases of datafile selection
return re.search(r"^eos_.*?TableFile$", field.name) or re.search(
r"^op_.*?FileName$", field.name
)
def __parse_datafiles(self, in_stream):
res = []
for line in in_stream:
line = line.strip()
if line:
res.append(os.path.basename(line))
return res
def __parse_description(self, text, field):
if not field.description:
m = re.search(r"^Valid Values:\s+(.*)", text)
if m:
assert "valid_values" not in field, f"duplicate valid value def: {text}"
field.valid_values = m.group(1)
return
if re.search(r'^"', text) or (
field.get("valid_values") and field.valid_values.count('"') % 2 == 1
):
assert (
"valid_values" in field
), f"expected previous valid values def: {text}"
field.valid_values += " " + text
return
field.description += (" " if field.description else "") + text
def __parse_field(self, text, model):
# [BOOLEAN] CONSTANT [FALSE]
# [STRING] ["FLASH 3 run"]
# [INTEGER] [2]
# [REAL] [1.0]
assert model is not None
m = re.search(r"^(.*?)\s\[(.*?)\]\s(CONSTANT)?\s*\[(.*?)\](.*)", text)
assert m, f"unparsable field: {text}"
name = m.group(1)
assert name not in model, f"duplicate field: {name}"
ftype = m.group(2)
is_constant = m.group(3) == "CONSTANT"
fdefault = m.group(4)
assert not m.group(5), f"extra values in field def: {line}"
model[name] = PKDict(
name=name,
type=ftype,
is_constant=is_constant,
default=fdefault,
description="",
)
return model[name]
def __parse_model(self, text, models, views):
name = self.model_name_from_flash_unit_name(text)
if name not in models:
models[name] = PKDict()
views[name] = PKDict(
title=text,
)
return models[name]
def __parse_setup(self, in_stream):
models = PKDict()
views = PKDict()
model = None
field = None
for line in in_stream:
if line == "\n":
continue
m = re.search(r"(^\w.*)", line)
if m:
model = self.__parse_model(m.group(1), models, views)
continue
m = re.search(r"^\s{4}(\w.*)", line)
if m:
if m.group(1) != "__doc__":
field = self.__parse_field(m.group(1), model)
continue
m = re.search(r"^\s{8}(\S.*)", line)
if m:
self.__parse_description(m.group(1), field)
continue
assert False, f'unhandled line: "{line}"'
return models, views
@classmethod
def __parse_special_number(cls, field_type, value):
res = (
cls._HUGE[field_type]
if re.search(r".*?HUGE", value)
else cls._TINY[field_type]
)
if re.search(r"^-", value):
return -res
return res
@classmethod
def __parse_string(cls, value):
assert re.search(r'^".*"$', value), f"invalid string: {value}"
return cls.remove_quotes(value)
def __parse_vars(self, in_stream):
res = set()
state = "name"
for line in in_stream:
if line == "\n":
if state == "newline":
state = "name"
continue
if state == "name":
m = re.search(r"^Name: (.*)\s*$", line)
assert m, f"expected var name: {line}"
res.add(m.group(1))
state = "newline"
return res
def __valid_values(self, field):
vv = field.valid_values
if vv == "Unconstrained":
return
m = re.search(r"^([\-0-9\.]+) to ([\-0-9\.]+)$", vv)
if m:
field.min = m.group(1)
field.max = m.group(2)
return
if re.search(r"^([\-0-9]+,\s*)+[\-0-9]+$", vv):
# integer pick list
field.enum = re.split(r",\s*", vv)
return
if re.search(r'^(".*?",\s*)*".*?"+$', vv):
# string pick list
field.enum = [self.__parse_string(v) for v in re.split(r",\s*", vv)]
return
m = re.search(r"^(\S+) to INFTY$", vv)
if m:
field.min = m.group(1)
return
m = re.search(r"^-INFTY to (\S+)$", vv)
if m:
field.min = None
field.max = m.group(1)
return
m = re.search(r"\sto\s([\-0-9]+)$", vv)
if m:
field.min = None
field.max = m.group(1)
field.description = f"Valid Values: {field.description}"
return
if re.search(r"\sto\sINFTY$", vv) or re.search(
r"^([\-0-9.]+,\s*)+[\-0-9.]+$", vv
):
# restore valid value info to description
field.description = f"Valid Values: {field.description}"
return
assert False, f"unhandled Valid Values for {field.name}: {vv}"
def __value_for_type(self, field, value):
if "enum" in field:
if re.search(r'"', value):
value = self.__parse_string(value)
if field.type.endswith("Optional"):
if not value or value == " ":
value = field.enum[0]
assert value in field.enum, f"enum: {value} not in list: {field.enum}"
return value
if field.type == "Boolean":
assert re.search(
r"^(true|false)$", value, re.IGNORECASE
), f"{field.name} invalid flash boolean: {value}"
return "1" if value.lower() == "true" else "0"
return self.parse_string_or_number(field.type, value)