Source code for sirepo.template.elegant_lattice_importer

# -*- coding: utf-8 -*-
u"""elegant lattice parser

:copyright: Copyright (c) 2015 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
from pykern import pkresource
from pykern.pkdebug import pkdc, pkdlog, pkdp
from sirepo import simulation_db
from sirepo.template import elegant_common
from sirepo.template import elegant_lattice_parser
import ntpath
import re
import subprocess

_IGNORE_FIELD = ['rootname', 'search_path', 'semaphore_file']

_SCHEMA = simulation_db.get_schema('elegant')


def _init_types():
    res = {}
    for name in _SCHEMA['model']:
        if name == name.upper():
            res[name] = True
    return res


_ELEGANT_TYPES = _init_types()


[docs]def build_variable_dependency(value, variables, depends): for v in str(value).split(' '): if v in variables: if v not in depends: build_variable_dependency(variables[v], variables, depends) depends.append(v) return depends
[docs]def import_file(text, data=None): models = elegant_lattice_parser.parse_file(text, max_id(data) if data else 0) name_to_id, default_beamline_id = _create_name_map(models) if 'default_beamline_name' in models and models['default_beamline_name'] in name_to_id: default_beamline_id = name_to_id[models['default_beamline_name']] element_names = {} rpn_cache = {} for el in models['elements']: el['type'] = _validate_type(el, element_names) element_names[el['name'].upper()] = el validate_fields(el, rpn_cache, models['rpnVariables']) for bl in models['beamlines']: bl['items'] = _validate_beamline(bl, name_to_id, element_names) if len(models['elements']) == 0 or len(models['beamlines']) == 0: raise IOError('no beamline elements found in file') if not data: data = simulation_db.default_data(elegant_common.SIM_TYPE) data['models']['elements'] = models['elements'] data['models']['beamlines'] = models['beamlines'] data['models']['rpnVariables'] = models['rpnVariables'] elegant_common.sort_elements_and_beamlines(data) if default_beamline_id: data['models']['simulation']['activeBeamlineId'] = default_beamline_id data['models']['simulation']['visualizationBeamlineId'] = default_beamline_id # used by synergia app to get values for rpn expressions data['models']['rpnCache'] = rpn_cache return data
[docs]def is_rpn_value(value): if (value): if re.search(r'^\s*(\-|\+)?(\d+|(\d*(\.\d*)))([eE][+-]?\d+)?\s*$', str(value)): return False return True return False
[docs]def max_id(data): max_id = 1 for model_type in ['elements', 'beamlines', 'commands']: if model_type not in data['models']: continue for m in data['models'][model_type]: id = m['_id'] if '_id' in m else m['id'] if id > max_id: max_id = id return max_id
[docs]def parse_rpn_value(value, variable_list): variables = {x['name']: x['value'] for x in variable_list} depends = build_variable_dependency(value, variables, []) #TODO(robnagler) scan variable values for strings. Need to be parsable var_list = ' '.join(map(lambda x: '{} sto {}'.format(variables[x], x), depends)) #TODO(pjm): security - need to scrub field value # execn send top of string stack to UNIX and put result on numerical stack # execs send top of string stack to UNIX and put output on string stack # csh start and enter C shell subprocess # cshs send top of string stack to C shell # gets get string from input file # seems like this would be bad, because you could construct a string that could be executed # mudf make user defined function from string stack (name commands mudf) # open open input/output file # puts put string to file # sleep sleep for number of seconds # @ push command input file pkdc('rpn variables={} expr="{}"', var_list, value) out = elegant_common.subprocess_output(['rpnl', '{} {}'.format(var_list, value)]) if out is None: return None, 'invalid' if len(out): return float(out.strip()), None return None, 'empty'
[docs]def validate_fields(el, rpn_cache, rpn_variables): for field in el.copy(): _validate_field(el, field, rpn_cache, rpn_variables) model_name = _model_name_for_data(el) for field in _SCHEMA['model'][model_name]: if field not in el: el[field] = _SCHEMA['model'][model_name][field][2]
def _model_name_for_data(model): return 'command_{}'.format(model['_type']) if '_type' in model else model['type'] def _create_name_map(models): name_to_id = {} last_beamline_id = None for bl in models['beamlines']: name_to_id[bl['name'].upper()] = bl['id'] last_beamline_id = bl['id'] for el in models['elements']: name_to_id[el['name'].upper()] = el['_id'] return name_to_id, last_beamline_id def _field_type_for_field(el, field): if re.search(r'\[\d+\]$', field): field = re.sub(r'\[\d+\]$', '', field) field_type = None model_name = _model_name_for_data(el) for f in _SCHEMA['model'][model_name]: if f == field: field_type = _SCHEMA['model'][model_name][f][1] break if not field_type: if not field in _IGNORE_FIELD: pkdlog('{}: unknown field type for {}', field, model_name) del el[field] return field_type def _validate_beamline(bl, name_to_id, element_names): items = [] for name in bl['items']: is_reversed = False if re.search(r'^-', name): is_reversed = True name = re.sub(r'^-', '', name) if name.upper() not in name_to_id: raise IOError('{}: unknown beamline item name'.format(name)) id = name_to_id[name.upper()] if name.upper() in element_names: items.append(id) else: items.append(-id if is_reversed else id) return items def _validate_enum(el, field, field_type): search = el[field].lower() exact_match = '' close_match = '' for v in _SCHEMA['enum'][field_type]: if v[0] == search: exact_match = v[0] break if search.startswith(v[0]) or v[0].startswith(search): close_match = v[0] if exact_match: el[field] = exact_match elif close_match: el[field] = close_match else: raise IOError('{} unknown value: "{}"'.format(field, search)) def _validate_field(el, field, rpn_cache, rpn_variables): if field in ['_id', '_type']: return if '_type' not in el and field == 'type': return field_type = _field_type_for_field(el, field) if not field_type: return if field_type == 'OutputFile': el[field] = '1' elif field_type == 'InputFile': el[field] = ntpath.basename(el[field]) elif field_type == "InputFileXY": _validate_input_file(el, field) elif (field_type == 'RPNValue' or field_type == 'RPNBoolean') and is_rpn_value(el[field]): _validate_rpn_field(el, field, rpn_cache, rpn_variables) elif field_type.endswith('StringArray'): _validate_string_array_field(el, field) elif field_type in _SCHEMA['enum']: _validate_enum(el, field, field_type) elif 'type' in el and el['type'] == 'SCRIPT' and field == 'command': _validate_script(el) def _validate_input_file(el, field): # <filename>=<x>+<y> fullname= ntpath.basename(el[field]) m = re.search('^(.*?)\=(.*?)\+(.*)$', fullname) if m: el[field] = m.group(1) el[field + 'X'] = m.group(2) el[field + 'Y'] = m.group(3) else: el[field] = fullname def _validate_rpn_field(el, field, rpn_cache, rpn_variables): if '_type' in el: return #TODO(pjm): doesn't reach this if? if '_type' in el: m = re.search('\((.*?)\)$', el[field]) if m: el[field] = m.group(1) m = re.search('\{\s*rpnl\s+(.*)\}$', el[field]) if m: el[field] = m.group(1) value, error = parse_rpn_value(el[field], rpn_variables) if error: raise IOError('invalid rpn: "{}"'.format(el[field])) rpn_cache[el[field]] = value def _validate_script(el): # ex, command: 'sddscombine %i beam1.sdds -merge %o' v = el['command'] if v: m = re.search(r'(\w+)\b', v, re.IGNORECASE) if m: executable = m.group(1) try: import distutils.spawn if not distutils.spawn.find_executable(executable): el['commandFile'] = executable except Exception as e: pass m = re.search(r'\b(\w+\.sdds)\b', v, re.IGNORECASE) if m: el['commandInputFile'] = m.group(1) def _validate_string_array_field(el, field): m = re.search('(.*?)\[(\d+)\]$', field) if not m: return value = el[field] del el[field] field = m.group(1) index = int(m.group(2)) if not field in el: model_name = _model_name_for_data(el) el[field] = _SCHEMA['model'][model_name][field][2] value_array = re.split('\s*,\s*', el[field]) m = re.search('^(\d+)\*(.*)$', value) if m: count = int(m.group(1)) val = m.group(2) for i in range(count): value_array[index + i] = val else: values = re.split('\s*,\s*', value) for v in values: value_array[index] = v index += 1 el[field] = ', '.join(value_array) def _validate_type(el, element_names): type = el['type'].upper() match = None for el_type in _ELEGANT_TYPES: if type.startswith(el_type) or el_type.startswith(type): if match: raise IOError('{}: type name matches multiple element types'.format(type)) match = el_type if not el_type: raise IOError('{}: unknown element type'.format(type)) if not match: # type may refer to another element if el['type'] in element_names: el_copy = element_names[el['type'].upper()] for field in el_copy.copy(): if field not in el: el[field] = el_copy[field] match = el_copy['type'] else: raise IOError('{}: element not found'.format(type)) return match