Source code for sirepo.template.opal_parser

# -*- coding: utf-8 -*-
"""OPAL parser.

:copyright: Copyright (c) 2020 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp
from sirepo.template import lattice
from sirepo.template.code_variable import CodeVar
from sirepo.template.lattice import LatticeUtil
import math
import os.path
import re
import sirepo.sim_data

_MATERIAL_CODE_TO_NAME = PKDict(
    al="ALUMINUM",
    be="BERYLLIUM",
    cu="COPPER",
    au="GOLD",
    mo="MOLYBDENUM",
    ti="TITANIUM",
)


[docs] class OpalParser(lattice.LatticeParser): def __init__(self): self.ignore_commands = set( [ "value", "stop", "quit", ] ) super().__init__(sirepo.sim_data.get_class("opal"))
[docs] def parse_file(self, lattice_text, update_filenames): from sirepo.template import opal res = super().parse_file(lattice_text) self.__fix_pow_variables() self._add_variables_for_lattice_references() cv = opal.code_var(self.data.models.rpnVariables) self._code_variables_to_float(cv) self.__remove_bend_default_fmap() self.__remove_default_commands() self.__combine_track_and_run() self.util = lattice.LatticeUtil(self.data, self.schema) input_files = self.__update_filenames(update_filenames) self.__set_element_positions(cv) self.__sort_element_positions(cv) self._set_default_beamline("track", "line") self.__legacy_fixups() self.__convert_references_to_ids() self.__combine_options() self.__dedup_elements() self.__remove_unused_drifts() return res, input_files
def __combine_options(self): option = None res = [] s = self.schema.model.command_option for cmd in self.data.models.commands: if cmd._type == "option": if not option: option = cmd else: for f in cmd: if f in s and s[f][2] != cmd[f]: option[f] = cmd[f] continue res.append(cmd) if option: option.name = "Opt1" self.data.models.commands = res def __combine_track_and_run(self): fields = self.schema.model.command_track track = None res = [] for cmd in self.data.models.commands: if cmd._type == "run": assert track for f in cmd: name = "run_{}".format(f) if name in fields: track[name] = cmd[f] continue if cmd._type == "track": track = cmd res.append(cmd) self.data.models.commands = res def __convert_references_to_ids(self): ref_model = PKDict( BeamList="beam", FieldsolverList="fieldsolver", DistributionList="distribution", ParticlematterinteractionList="particlematterinteraction", WakeList="wake", GeometryList="geometry", LatticeBeamlineList="beamline", OptionalLatticeBeamlineList="beamline", ) name_to_id = PKDict() for id in self.util.id_map: name = self.util.id_map[id].name.upper() assert name not in name_to_id, "duplicate name: {}".format(name) name_to_id[name] = id for container in ("elements", "commands"): for el in self.data.models[container]: model_schema = self.schema.model[self.util.model_name_for_data(el)] for f in model_schema: if f in el and el[f]: el_schema = model_schema[f] if el_schema[1] in ref_model: el[f] = name_to_id[el[f].upper()] elif el_schema[1] in self.schema.enum: if el_schema[1] == "Boolean": if el[f] == "1" or el[f] == "0": pass elif el[f].lower() == "true": el[f] = "1" else: el[f] = "0" else: el[f] = el[f].upper() found_enum = False if el_schema[1] == "ParticlematterinteractionMaterial": el[f] = _MATERIAL_CODE_TO_NAME.get( el[f].lower(), el[f] ) for e in self.schema.enum[el_schema[1]]: if el[f] == e[0]: found_enum = True break assert found_enum, "unknown value {}: {}".format( f, el[f] ) def __dedup_elements(self): # iterate all element, remove duplicates, fixup beamlines def _name(name): return re.sub(r"#.*$", "", name) elements_by_type = PKDict() for el in self.data.models.elements: t = el.type if t not in elements_by_type: elements_by_type[t] = [] elements_by_type[t].append(el) id_map = PKDict() for t in elements_by_type: elements = elements_by_type[t] for i in range(len(elements)): if "_matched" in elements[i]: continue for j in range(i + 1, len(elements)): if "_matched" in elements[j]: continue is_equal = True for f in elements[i]: if f == "_id": continue if f == "name": if _name(elements[i].name) == _name(elements[j].name): continue else: is_equal = False break if ( f in elements[i] and f in elements[j] and elements[i][f] != elements[j][f] ): is_equal = False break if is_equal: id_map[elements[j]._id] = elements[i]._id elements[j]._matched = True elements = [] for el in self.data.models.elements: if "_matched" not in el: elements.append(el) self.data.models.elements = elements for beamline in self.data.models.beamlines: items = [] for el_id in beamline["items"]: if el_id in id_map: items.append(id_map[el_id]) else: items.append(el_id) beamline["items"] = items def __fix_pow_variables(self): # REAL beta=sqrt(1-(1/gamma^2)); # REAL p_tot = (e_tot^2-PMASS^2)^0.5; for v in self.data.models.rpnVariables: # TODO(pjm): only works for simple cases v.value = re.sub(r"\(([^)]+)\)\s*\^\s*([\d.]+)", r"pow(\1,\2)", v.value) v.value = re.sub(r"(\w+)\s*\^\s*([\d.]+)", r"pow(\1,\2)", v.value) def __legacy_fixups(self): res = [] for cmd in self.data.models.commands: if cmd._type == "distribution" and not cmd.type and "distribution" in cmd: cmd.type = cmd.distribution del cmd["distribution"] if cmd._type != "select": res.append(cmd) self.data.models.commands = res def __remove_bend_default_fmap(self): for el in self.data.models.elements: if re.search(r"bend", el.type.lower()): if "fmapfn" in el and el.fmapfn.upper() == "1DPROFILE1-DEFAULT": del el["fmapfn"] def __remove_default_commands(self): from sirepo import simulation_db size = len(simulation_db.default_data(self.sim_data.sim_type()).models.commands) res = [] names = set(self.data.models.commands[0].name) for i in range(len(self.data.models.commands)): cmd = self.data.models.commands[i] if i >= size: if not cmd.name: cmd.name = self.__unique_name(cmd, names) res.append(cmd) names.add(cmd.name) self.data.models.commands = res def __remove_unused_drifts(self): in_use = set() for beamline in self.data.models.beamlines: for el_id in beamline["items"]: in_use.add(el_id) elements = [] for el in self.data.models.elements: if el.type == "DRIFT" and el._id not in in_use: continue elements.append(el) self.data.models.elements = elements def __set_element_positions(self, code_var): def _position(el): if "origin" in el: p = self.__split_values(el.origin)[2] else: p = el.get("elemedge", el.get("z", 0)) return PKDict( elemedge=p, ) beamline_ids = [] for beamline in self.data.models.beamlines: beamline_ids.append(beamline.id) for beamline in self.data.models.beamlines: positions = [] for idx in range(len(beamline["items"])): item_id = beamline["items"][idx] el = self.util.id_map[item_id] if item_id in beamline_ids: positions.append( PKDict( elemedge=positions[-1].elemedge if len(positions) else 0, ) ) else: positions.append(_position(el)) beamline.positions = positions for beamline in self.data.models.beamlines: if "origin" in beamline: (beamline.x, beamline.y, beamline.z) = self.__split_values( beamline.origin ) del beamline["origin"] if "orientation" in beamline: (beamline.theta, beamline.phi, beamline.psi) = self.__split_values( beamline.orientation ) del beamline["orientation"] for el in self.data.models.elements: if "elemedge" in el: del el["elemedge"] if "l" in el and not code_var.is_var_value(el.l): el.l = float(el.l) self.util.sort_elements_and_beamlines() def __sort_element_positions(self, code_var): for beamline in self.data.models.beamlines: ip = sorted( zip(beamline["items"], beamline.positions), key=lambda v: float(code_var.eval_var_with_assert(v[1].elemedge)), ) beamline["items"] = [item for item, _ in ip] beamline.positions = [pos for _, pos in ip] def __split_values(self, values): return re.split(r"\s*,\s*", re.sub(r"^{|}$", "", values)) def __unique_name(self, cmd, names): prefix = cmd._type.upper()[:2] num = 1 while True: name = "{}{}".format(prefix, num) if name not in names: return name num += 1 def __update_filenames(self, update_filenames): res = [] visited = set() for container in ("elements", "commands"): for el in self.data.models[container]: model_name = self.util.model_name_for_data(el) el_schema = self.schema.model[model_name] for f in el: if f not in el_schema: continue if el_schema[f][1] == "OutputFile" and el[f]: if update_filenames: el[f] = "1" elif el_schema[f][1] == "InputFile" and el[f]: if update_filenames: el[f] = self.sim_data.lib_file_name_without_type( os.path.basename(el[f]) ) filename = self.sim_data.lib_file_name_with_model_field( model_name, f, el[f] ) if filename not in visited: res.append( PKDict( label=el.name, type=LatticeUtil.type_for_data(el), file_type="{}-{}".format(model_name, f), filename=el[f], field=f, lib_filename=filename, ) ) visited.add(filename) return res
[docs] def parse_file(lattice_text, filename=None, update_filenames=True): res, files = OpalParser().parse_file(lattice_text, update_filenames) if not len(res.models.beamlines): raise AssertionError("No beamlines parsed from OPAL input file") set_simulation_name(res, filename) return res, files
[docs] def set_simulation_name(data, filename): data.models.simulation.name = None commands = [] for cmd in data.models.commands: if cmd._type == "title": data.models.simulation.name = cmd.string else: commands.append(cmd) data.models.commands = commands if not data.models.simulation.name and filename: data.models.simulation.name = re.sub(r"\..*$", "", os.path.basename(filename))