Source code for sirepo.template.madx_parser

# -*- coding: utf-8 -*-
"""MAD-X parser.

:copyright: Copyright (c) 2020 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
from pykern import pkio
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp
from sirepo.template import lattice
import re
import sirepo.sim_data


[docs] class MadXParser(lattice.LatticeParser): def __init__(self): self.ignore_commands = set( [ "aperture", "assign", "call", "coguess", "correct", "create", "ealign", "efcomp", "endedit", "eoption", "esave", "exec", "exit", "fill", "install", "plot", "print", "quit", "readtable", "reflect", "return", "run", "save", "select_ptc_normal", "seqedit", "setplot", "setvars", "setvars_lin", "simplex", "sixtrack", "start", "stop", "survey", "sxfread", "sxfwrite", "system", "touschek", "use_macro", "usekick", "usemonitor", "value", "weight", "wire", "write", ] ) super().__init__(sirepo.sim_data.get_class("madx"))
[docs] def parse_file(self, lattice_text, downcase_variables=False): from sirepo.template import madx lattice_text = re.sub(r",\s*,", ",", lattice_text) res = super().parse_file(lattice_text) self._add_variables_for_lattice_references() cv = madx.code_var(self.data.models.rpnVariables) self._code_variables_to_float(cv) self.__convert_sequences_to_beamlines(cv) self._set_default_beamline("use", "sequence", "period") self.__convert_references_to_ids() if downcase_variables: self._downcase_variables(cv) return res
def __convert_references_to_ids(self): util = lattice.LatticeUtil(self.data, self.schema) name_to_id = PKDict() for id in util.id_map: name = util.id_map[id].get("name") if not name: continue name = name.upper() # assert name not in name_to_id, 'duplicate name: {}'.format(name) name_to_id[name] = id for container in ("elements", "commands"): for el in self.data.models[container]: model_schema = self.schema.model[ lattice.LatticeUtil.model_name_for_data(el) ] for f in model_schema: el_schema = model_schema[f] if f in el: if el[f] and "LatticeBeamlineList" in el_schema[1]: el[f] = name_to_id[el[f].upper()] elif el[f] and el_schema[1] == "OutputFile" and el[f] != "0": el[f] = "1" elif el_schema[1] in self.schema.enum: # TODO(pjm): ensure value is present in enum list el[f] = el[f].lower() if "Boolean" in el_schema[1]: if el[f] == "1" or el[f] == "0": pass elif el_schema[1] == "OptionalBoolean" and el[f] == "": pass elif el[f].lower() == "true": el[f] = "1" else: el[f] = "0" def __convert_sequences_to_beamlines(self, code_var): data = PKDict( models=self.data.models, ) drifts = self._compute_drifts(code_var) util = lattice.LatticeUtil(data, self.schema) for seq in data.models.sequences: beamline = PKDict( name=seq.name, items=[], ) alignment = seq.refer.lower() if "refer" in seq else "centre" assert alignment in ( "entry", "centre", "exit", ), "invalid sequence alignment: {}".format(alignment) prev = 0 for item in seq["items"]: el = util.id_map[item[0]] at = self._eval_var(code_var, item[1]) length = self._eval_var(code_var, el.get("l", 0)) entry = at if alignment == "centre": entry = at - length / 2 elif alignment == "exit": entry = at - length d = self._get_drift(drifts, entry - prev) if d: beamline["items"].append(d) beamline["items"].append(el._id) prev = entry + length if beamline["items"]: if "l" in seq: d = self._get_drift(drifts, self._eval_var(code_var, seq.l) - prev) if d: beamline["items"].append(d) beamline.id = self.parser.next_id() data.models.beamlines.append(beamline) del data.models["sequences"] util.sort_elements_and_beamlines()
# TODO(pjm): move into parser class def _fixup_madx(madx): # move imported beam over default-data.json beam # remove duplicate twiss # remove "call" and "use" commands beam_idx = None first_twiss = True res = [] for cmd in madx.models.commands: if cmd._type == "call" or cmd._type == "use": continue if cmd._type == "beam": if beam_idx is None: beam_idx = madx.models.commands.index(cmd) else: res[beam_idx] = cmd _update_beam_and_bunch(cmd, madx) continue elif cmd._type == "twiss": if first_twiss: first_twiss = False continue res.append(cmd) madx.models.commands = res
[docs] def parse_file(lattice_text, downcase_variables=False): res = MadXParser().parse_file(lattice_text, downcase_variables) _fixup_madx(res) return res
[docs] def parse_tfs_page_info(tfs_file): # returns an array of page info: name, turn, s col_names = parse_tfs_file(tfs_file, header_only=True) turn_idx = col_names.index("turn") s_idx = col_names.index("s") res = [] mode = "segment" with pkio.open_text(tfs_file) as f: for line in f: if mode == "segment" and re.search(r"^\#segment\s", line): name = re.split(r"\s+", line.strip())[-1] res.append( PKDict( name=name, ) ) mode = "data" elif mode == "data" and re.search(r"^\s+\S", line): data = re.split(r"\s+", line.strip()) res[-1].update( PKDict( turn=data[turn_idx], s=data[s_idx], ) ) mode = "segment" return res
[docs] def parse_tfs_file(tfs_file, header_only=False, want_page=-1): mode = "header" col_names = [] rows = [] current_page = -1 with pkio.open_text(tfs_file) as f: for line in f: if mode == "header": # header row starts with * if re.search(r"^\*\s", line): col_names = re.split(r"\s+", line.strip()) col_names = col_names[1:] mode = "data" if header_only: return [x.lower() for x in col_names] elif mode == "data": # data rows after header, start with blank if re.search(r"^\s+\S", line) and want_page == current_page: data = re.split(r"\s+", line.strip()) rows.append(data) elif want_page >= 0 and re.search(r"^\#segment\s", line): current_page += 1 res = PKDict(map(lambda x: (x.lower(), []), col_names)) for i in range(len(col_names)): name = col_names[i].lower() if name: for row in rows: res[name].append(row[i]) return res
_TWISS_VARS = PKDict( sr_twiss_beta_x="betx", sr_twiss_beta_y="bety", sr_twiss_alpha_x="alfx", sr_twiss_alpha_y="alfy", ) def _update_beam_and_bunch(beam, data): bunch = data.models.bunch schema = sirepo.sim_data.get_class("madx").schema() if "particle" in beam: beam.particle = beam.particle.lower() found = False for pt in schema.enum.ParticleType: if pt[0] == beam.particle: found = True break if not found: beam.particle = "other" for bd in schema.enum.BeamDefinition: if bd[0] in beam and beam[bd[0]]: bunch.beamDefinition = bd[0] break for v in data.models.rpnVariables: if v.name in _TWISS_VARS: bunch[_TWISS_VARS[v.name]] = v.value