Source code for sirepo.template.zgoubi

# -*- coding: utf-8 -*-
u"""zgoubi execution template.

:copyright: Copyright (c) 2018 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
from pykern import pkcollections
from pykern import pkio
from pykern import pkjinja
from pykern.pkdebug import pkdc, pkdp
from sirepo import simulation_db
from sirepo.template import template_common
import math
import numpy as np
import py.path
import re

SIM_TYPE = 'zgoubi'

BUNCH_SUMMARY_FILE = 'bunch.json'

WANT_BROWSER_FRAME_CACHE = True

ZGOUBI_LOG_FILE = 'sr_zgoubi.log'

_SCHEMA = simulation_db.get_schema(SIM_TYPE)

_ZGOUBI_DATA_FILE = 'zgoubi.fai'

_ZGOUBI_LOG_FILE = 'zgoubi.res'

_ZGOUBI_TWISS_FILE = 'zgoubi.TWISS.out'

_INITIAL_PHASE_MAP = {
    'D1': 'Do1',
    'time': 'to',
}

_MODEL_UNITS = None

_PHASE_SPACE_FIELD_INFO = {
    'Do1': [u'dp/p₀', 1],
    'Yo': [u'Y₀ [m]', 0.01],
    'To': [u'Y\'₀ [rad]', 0.001],
    'Zo': [u'Z₀ [m]', 0.01],
    'Po': [u'Z\'₀ [rad]', 0.001],
    'So': [u's₀ [m]', 0.01],
    'to': [u't₀', 1],
    'D1': ['dp/p', 1],
    'Y': ['Y [m]', 0.01],
    'T': ['Y\' [rad]', 0.001],
    'Z': ['Z [m]', 0.01],
    'P': ['Z\' [rad]', 0.001],
    'S': ['s [m]', 0.01],
    'time': ['t', 1],
}

_PYZGOUBI_FIELD_MAP = {
    'l': 'XL',
    'plt': 'label2',
}

_REPORT_ENUM_INFO = {
    'twissReport': 'TwissParameter',
    'twissReport2': 'TwissParameter',
    'opticsReport': 'OpticsParameter',
}

_TWISS_SUMMARY_LABELS = {
    'LENGTH': 'Beamline length [m]',
    'ORBIT5': 'Orbit5 [m]',
    'ALFA': 'Momentum compaction factor',
    'GAMMATR': 'Transition energy gamma',
    'DELTAP': 'Energy difference',
    'ENERGY': 'Particle energy [GeV]',
    'GAMMA': 'Particle gamma',

    'Q1': 'Horizontal tune (fractional)',
    'DQ1': 'Horizontal chromaticity',
    'BETXMIN': 'Horizontal minimum beta [m]',
    'BETXMAX': 'Horizontal maximum beta [m]',
    'DXMIN': 'Horizontal minimum dispersion [m]',
    'DXMAX': 'Horizontal maximum dispersion [m]',
    'DXRMS': 'Horizontal RMS dispersion [m]',
    'XCOMIN': 'Horizontal closed orbit minimum deviation [m]',
    'XCOMAX': 'Horizontal closed orbit maximum deviation [m]',
    'XCORMS': 'Horizontal closed orbit RMS deviation [m]',

    'Q2': 'Vertical tune (fractional)',
    'DQ2': 'Vertical chromaticity',
    'BETYMIN': 'Vertical minimum beta [m]',
    'BETYMAX': 'Vertical maximum beta [m]',
    'DYMIN': 'Vertical minimum dispersion [m]',
    'DYMAX': 'Vertical maximum dispersion [m]',
    'DYRMS': 'Vertical RMS dispersion [m]',
    'YCOMIN': 'Vertical closed orbit minimum deviation [m]',
    'YCOMAX': 'Vertical closed orbit maximum deviation [m]',
    'YCORMS': 'Vertical closed orbit RMS deviation [m]',
}


[docs]def background_percent_complete(report, run_dir, is_running): errors = '' if not is_running: data_file = run_dir.join(_ZGOUBI_DATA_FILE) if data_file.exists(): col_names, rows = read_data_file(data_file) count = int(rows[-1][col_names.index('IPASS')]) return { 'percentComplete': 100, 'frameCount': count + 1, } else: errors = _parse_zgoubi_log(run_dir) return { 'percentComplete': 0, 'frameCount': 0, 'error': errors, }
[docs]def column_data(col, col_names, rows): idx = col_names.index(col) assert idx >= 0, 'invalid col: {}'.format(col) res = [] for row in rows: res.append(float(row[idx])) return res
[docs]def extract_first_twiss_row(run_dir): col_names, rows = read_data_file(py.path.local(run_dir).join(_ZGOUBI_TWISS_FILE)) return col_names, rows[0]
[docs]def fixup_old_data(data): for m in [ 'bunch', 'bunchAnimation', 'bunchAnimation2', 'particle', 'simulationSettings', 'opticsReport', 'twissReport', 'twissReport2', 'twissSummaryReport', ]: if m not in data.models: data.models[m] = pkcollections.Dict({}) template_common.update_model_defaults(data.models[m], m, _SCHEMA)
[docs]def get_animation_name(data): return 'animation'
[docs]def get_application_data(data): if data['method'] == 'compute_particle_ranges': return template_common.compute_field_range(data, _compute_range_across_frames)
[docs]def get_simulation_frame(run_dir, data, model_data): if re.search(r'bunchAnimation', data['modelName']): return _extract_bunch_animation(run_dir, data, model_data) assert False, 'invalid animation frame model: {}'.format(data['modelName'])
[docs]def lib_files(data, source_lib): return []
[docs]def parse_error_log(run_dir): return None
[docs]def python_source_for_model(data, model): return _generate_parameters_file(data)
[docs]def prepare_output_file(report_info, data): report = data['report'] if 'bunchReport' in report or 'twissReport' in report or 'opticsReport' in report: fn = simulation_db.json_filename(template_common.OUTPUT_BASE_NAME, report_info.run_dir) if fn.exists(): fn.remove() save_report_data(data, report_info.run_dir)
[docs]def read_data_file(path): text = pkio.read_text(path) # mode: title -> header -> data mode = 'title' col_names = [] rows = [] for line in text.split("\n"): if mode == 'title': if not re.search(r'^\@', line): mode = 'header' continue # work-around odd header/value "! optimp.f" line = re.sub(r'\!\s', '', line) if mode == 'header': # header row starts with '# <letter>' if re.search(r'^#\s+[a-zA-Z]', line): col_names = re.split('\s+', line) col_names = map(lambda x: re.sub(r'\W|_', '', x), col_names[1:]) mode = 'data' elif mode == 'data': if re.search('^#', line): continue row = re.split('\s+', re.sub(r'^\s+', '', line)) rows.append(row) rows.pop() return col_names, rows
[docs]def remove_last_frame(run_dir): pass
[docs]def save_report_data(data, run_dir): report_name = data['report'] if 'twissReport' in report_name or 'opticsReport' in report_name: enum_name = _REPORT_ENUM_INFO[report_name] report = data['models'][report_name] plots = [] col_names, rows = read_data_file(py.path.local(run_dir).join(_ZGOUBI_TWISS_FILE)) for f in ('y1', 'y2', 'y3'): if report[f] == 'none': continue plots.append({ 'points': column_data(report[f], col_names, rows), 'label': template_common.enum_text(_SCHEMA, enum_name, report[f]), }) #TODO(pjm): use template_common x = column_data('sums', col_names, rows) res = { 'title': '', 'x_range': [min(x), max(x)], 'y_label': '', 'x_label': 's [m]', 'x_points': x, 'plots': plots, 'y_range': template_common.compute_plot_color_and_range(plots), 'summaryData': _read_twiss_header(run_dir), } elif report_name == 'twissSummaryReport': res = { #TODO(pjm): x_range requied by sirepo-plotting.js 'x_range': [], 'summaryData': _read_twiss_header(run_dir), } elif 'bunchReport' in report_name: report = data['models'][report_name] col_names, rows = read_data_file(py.path.local(run_dir).join(_ZGOUBI_DATA_FILE)) res = _extract_bunch_data(report, col_names, rows, '') summary_file = py.path.local(run_dir).join(BUNCH_SUMMARY_FILE) if summary_file.exists(): res['summaryData'] = { 'bunch': simulation_db.read_json(summary_file) } else: raise RuntimeError('unknown report: {}'.format(report_name)) simulation_db.write_result( res, run_dir=run_dir, )
[docs]def simulation_dir_name(report_name): if 'bunchReport' in report_name: return 'bunchReport' if 'opticsReport' in report_name or report_name == 'twissSummaryReport': return 'twissReport2' return report_name
[docs]def write_parameters(data, run_dir, is_parallel, python_file=template_common.PARAMETERS_PYTHON_FILE): pkio.write_text( run_dir.join(python_file), _generate_parameters_file(data), )
def _compute_range_across_frames(run_dir, data): res = {} for v in _SCHEMA.enum.PhaseSpaceCoordinate: res[v[0]] = [] col_names, rows = read_data_file(py.path.local(run_dir).join(_ZGOUBI_DATA_FILE)) ipass_index = int(col_names.index('IPASS')) for field in res: values = column_data(field, col_names, rows) values += column_data(_initial_phase_field(field), col_names, rows) if len(res[field]): res[field][0] = min(min(values), res[field][0]) res[field][1] = max(max(values), res[field][1]) else: res[field] = [min(values), max(values)] for field in res.keys(): factor = _PHASE_SPACE_FIELD_INFO[field][1] res[field][0] *= factor res[field][1] *= factor res[_initial_phase_field(field)] = res[field] return res def _element_value(el, field): converter = _MODEL_UNITS.get(el['type'], {}).get(field, None) return converter(el[field]) if converter else el[field] def _extract_bunch_animation(run_dir, data, model_data): frame_index = int(data['frameIndex']) report = template_common.parse_animation_args( data, { '1': ['x', 'y', 'histogramBins', 'startTime'], '': ['x', 'y', 'histogramBins', 'plotRangeType', 'horizontalSize', 'horizontalOffset', 'verticalSize', 'verticalOffset', 'isRunning', 'startTime'], }, ) is_frame_0 = False # remap frame 0 to use initial "o" values from frame 1 if frame_index == 0: is_frame_0 = True for f in ('x', 'y'): v = report[f] report[f] = _initial_phase_field(v) frame_index = 1 col_names, all_rows = read_data_file(run_dir.join(_ZGOUBI_DATA_FILE)) rows = [] ipass_index = int(col_names.index('IPASS')) for row in all_rows: if int(row[ipass_index]) == frame_index: rows.append(row) model = model_data.models.bunchAnimation model.update(report) return _extract_bunch_data(model, col_names, rows, 'Initial Distribution' if is_frame_0 else 'Pass {}'.format(frame_index)) def _extract_bunch_data(report, col_names, rows, title): x_info = _PHASE_SPACE_FIELD_INFO[report['x']] y_info = _PHASE_SPACE_FIELD_INFO[report['y']] x = np.array(column_data(report['x'], col_names, rows)) * x_info[1] y = np.array(column_data(report['y'], col_names, rows)) * y_info[1] return template_common.heatmap([x, y], report, { 'x_label': x_info[0], 'y_label': y_info[0], 'title': title, 'z_label': 'Number of Particles', }); def _generate_beamline(data, beamline_map, element_map, beamline_id): res = '' for item_id in beamline_map[beamline_id]['items']: if item_id in beamline_map: res += _generate_beamline(data, beamline_map, element_map, item_id) continue el = element_map[item_id] if el['type'] == 'AUTOREF': res += 'line.add(core.FAKE_ELEM(""" \'AUTOREF\'\n{}\n{} {} {}\n"""))\n'.format( el.I, _element_value(el, 'XCE'), _element_value(el, 'YCE'), _element_value(el, 'ALE')) elif el['type'] == 'YMY': res += 'line.add(core.FAKE_ELEM(""" \'YMY\'\n"""))\n' elif el['type'] == 'SEXTUPOL': res += _generate_element(el, 'QUADRUPO') else: assert el['type'] in _MODEL_UNITS, 'Unsupported element: {}'.format(el['type']) res += _generate_element(el) return res def _generate_beamline_elements(report, data): res = '' sim = data['models']['simulation'] beamline_map = {} for bl in data.models.beamlines: beamline_map[bl.id] = bl element_map = {} for el in data.models.elements: element_map[el._id] = el if report == 'twissReport': beamline_id = sim['activeBeamlineId'] else: if 'visualizationBeamlineId' not in sim or not sim['visualizationBeamlineId']: sim['visualizationBeamlineId'] = data.models.beamlines[0].id beamline_id = sim['visualizationBeamlineId'] return _generate_beamline(data, beamline_map, element_map, beamline_id) def _generate_element(el, schema_type=None): res = 'line.add(core.{}("{}"'.format(el.type, el.name) for f in _SCHEMA.model[schema_type or el.type]: if f == 'name' or f == 'order' or f == 'format': continue #TODO(pjm): need ignore list if el.type == 'CAVITE' and (f == 'l' or f == 'IOP' or f == 'f_RF'): continue res += ', {}={}'.format(_PYZGOUBI_FIELD_MAP.get(f, f), _element_value(el, f)) res += '))\n' return res def _generate_parameters_file(data): v = template_common.flatten_data(data.models, {}) report = data.report if 'report' in data else '' v['particleDef'] = _generate_particle(data.models.particle) v['beamlineElements'] = _generate_beamline_elements(report, data) res = template_common.render_jinja(SIM_TYPE, v, 'base.py') if 'twissReport' in report or 'opticsReport' in report or report == 'twissSummaryReport': return res + template_common.render_jinja(SIM_TYPE, v, 'twiss.py') # if 'opticsReport' in report: # return res + template_common.render_jinja(SIM_TYPE, v, 'optics.py') v['outputFile'] = _ZGOUBI_DATA_FILE res += template_common.render_jinja(SIM_TYPE, v, 'bunch.py') if 'bunchReport' in report: return res + template_common.render_jinja(SIM_TYPE, v, 'bunch-report.py') return res + template_common.render_jinja(SIM_TYPE, v) def _generate_particle(particle): if particle.particleType == 'Other': return '{} {} {} {} 0'.format(particle.M, particle.Q, particle.G, particle.Tau) return particle.particleType def _init_model_units(): # Convert element units (m, rad) to the required zgoubi units (cm, mrad, degrees) def _cm(meters): return float(meters) * 100 def _degrees(radians): return float(radians) * 180 / math.pi def _marker_plot(v): return '"{}"'.format('.plt' if int(v) else '') def _mrad(mrad): return float(mrad) * 1000 def _xpas(v): if re.search(r'^#', v): v = re.sub(r'^#', '', v) return '[{}]'.format(','.join(v.split('|'))) return str(_cm(float(v))) return { 'AUTOREF': { 'XCE': _cm, 'YCE': _cm, 'ALE': _mrad, }, 'BEND': { 'l': _cm, 'X_E': _cm, 'LAM_E': _cm, 'X_S': _cm, 'LAM_S': _cm, 'XPAS': _xpas, 'XCE': _cm, 'YCE': _cm, }, 'CAVITE': { }, 'CHANGREF': { 'ALE': _degrees, 'XCE': _cm, 'YCE': _cm, }, 'DRIFT': { 'l': _cm, }, 'MARKER': { 'plt': _marker_plot, }, 'MULTIPOL': { 'l': _cm, 'R_0': _cm, 'X_E': _cm, 'LAM_E': _cm, 'X_S': _cm, 'LAM_S': _cm, 'XPAS': _xpas, 'XCE': _cm, 'YCE': _cm, }, 'QUADRUPO': { 'l': _cm, 'R_0': _cm, 'X_E': _cm, 'LAM_E': _cm, 'X_S': _cm, 'XPAS': _xpas, 'LAM_S': _cm, 'XCE': _cm, 'YCE': _cm, }, 'SEXTUPOL': { 'l': _cm, 'R_0': _cm, 'X_E': _cm, 'LAM_E': _cm, 'X_S': _cm, 'XPAS': _xpas, 'LAM_S': _cm, 'XCE': _cm, 'YCE': _cm, }, } def _initial_phase_field(field): return _INITIAL_PHASE_MAP.get(field, '{}o'.format(field)) def _parse_zgoubi_log(run_dir): path = run_dir.join(_ZGOUBI_LOG_FILE) if not path.exists(): return '' res = '' element_by_num = {} text = pkio.read_text(str(path)) for line in text.split("\n"): match = re.search(r'^ (\'\w+\'.*?)\s+(\d+)$', line) if match: element_by_num[match.group(2)] = match.group(1) continue if re.search('all particles lost', line): res += '{}\n'.format(line) continue match = re.search(r'Enjob occured at element # (\d+)', line) if match: res += '{}\n'.format(line) num = match.group(1) if num in element_by_num: res += ' element # {}: {}\n'.format(num, element_by_num[num]) return res def _read_twiss_header(run_dir): path = py.path.local(run_dir).join(_ZGOUBI_TWISS_FILE) res = [] for line in pkio.read_text(path).split('\n'): for var in line.split('@ '): values = var.split() if len(values) and values[0] in _TWISS_SUMMARY_LABELS: v = values[2] if re.search(r'[a-z]{2}', v, re.IGNORECASE): pass else: v = float(v) res.append([values[0], _TWISS_SUMMARY_LABELS[values[0]], v]) return res _MODEL_UNITS = _init_model_units()