# -*- coding: utf-8 -*-
u"""zgoubi execution template.
:copyright: Copyright (c) 2018 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
from pykern import pkcollections
from pykern import pkio
from pykern import pkjinja
from pykern.pkdebug import pkdc, pkdp
from sirepo import simulation_db
from sirepo.template import template_common
import math
import numpy as np
import py.path
import re
SIM_TYPE = 'zgoubi'
BUNCH_SUMMARY_FILE = 'bunch.json'
WANT_BROWSER_FRAME_CACHE = True
ZGOUBI_LOG_FILE = 'sr_zgoubi.log'
_SCHEMA = simulation_db.get_schema(SIM_TYPE)
_ZGOUBI_DATA_FILE = 'zgoubi.fai'
_ZGOUBI_LOG_FILE = 'zgoubi.res'
_ZGOUBI_TWISS_FILE = 'zgoubi.TWISS.out'
_INITIAL_PHASE_MAP = {
'D1': 'Do1',
'time': 'to',
}
_MODEL_UNITS = None
_PHASE_SPACE_FIELD_INFO = {
'Do1': [u'dp/p₀', 1],
'Yo': [u'Y₀ [m]', 0.01],
'To': [u'Y\'₀ [rad]', 0.001],
'Zo': [u'Z₀ [m]', 0.01],
'Po': [u'Z\'₀ [rad]', 0.001],
'So': [u's₀ [m]', 0.01],
'to': [u't₀', 1],
'D1': ['dp/p', 1],
'Y': ['Y [m]', 0.01],
'T': ['Y\' [rad]', 0.001],
'Z': ['Z [m]', 0.01],
'P': ['Z\' [rad]', 0.001],
'S': ['s [m]', 0.01],
'time': ['t', 1],
}
_PYZGOUBI_FIELD_MAP = {
'l': 'XL',
'plt': 'label2',
}
_REPORT_ENUM_INFO = {
'twissReport': 'TwissParameter',
'twissReport2': 'TwissParameter',
'opticsReport': 'OpticsParameter',
}
_TWISS_SUMMARY_LABELS = {
'LENGTH': 'Beamline length [m]',
'ORBIT5': 'Orbit5 [m]',
'ALFA': 'Momentum compaction factor',
'GAMMATR': 'Transition energy gamma',
'DELTAP': 'Energy difference',
'ENERGY': 'Particle energy [GeV]',
'GAMMA': 'Particle gamma',
'Q1': 'Horizontal tune (fractional)',
'DQ1': 'Horizontal chromaticity',
'BETXMIN': 'Horizontal minimum beta [m]',
'BETXMAX': 'Horizontal maximum beta [m]',
'DXMIN': 'Horizontal minimum dispersion [m]',
'DXMAX': 'Horizontal maximum dispersion [m]',
'DXRMS': 'Horizontal RMS dispersion [m]',
'XCOMIN': 'Horizontal closed orbit minimum deviation [m]',
'XCOMAX': 'Horizontal closed orbit maximum deviation [m]',
'XCORMS': 'Horizontal closed orbit RMS deviation [m]',
'Q2': 'Vertical tune (fractional)',
'DQ2': 'Vertical chromaticity',
'BETYMIN': 'Vertical minimum beta [m]',
'BETYMAX': 'Vertical maximum beta [m]',
'DYMIN': 'Vertical minimum dispersion [m]',
'DYMAX': 'Vertical maximum dispersion [m]',
'DYRMS': 'Vertical RMS dispersion [m]',
'YCOMIN': 'Vertical closed orbit minimum deviation [m]',
'YCOMAX': 'Vertical closed orbit maximum deviation [m]',
'YCORMS': 'Vertical closed orbit RMS deviation [m]',
}
[docs]def background_percent_complete(report, run_dir, is_running):
errors = ''
if not is_running:
data_file = run_dir.join(_ZGOUBI_DATA_FILE)
if data_file.exists():
col_names, rows = read_data_file(data_file)
count = int(rows[-1][col_names.index('IPASS')])
return {
'percentComplete': 100,
'frameCount': count + 1,
}
else:
errors = _parse_zgoubi_log(run_dir)
return {
'percentComplete': 0,
'frameCount': 0,
'error': errors,
}
[docs]def column_data(col, col_names, rows):
idx = col_names.index(col)
assert idx >= 0, 'invalid col: {}'.format(col)
res = []
for row in rows:
res.append(float(row[idx]))
return res
[docs]def fixup_old_data(data):
for m in [
'bunch',
'bunchAnimation',
'bunchAnimation2',
'particle',
'simulationSettings',
'opticsReport',
'twissReport',
'twissReport2',
'twissSummaryReport',
]:
if m not in data.models:
data.models[m] = pkcollections.Dict({})
template_common.update_model_defaults(data.models[m], m, _SCHEMA)
[docs]def get_animation_name(data):
return 'animation'
[docs]def get_application_data(data):
if data['method'] == 'compute_particle_ranges':
return template_common.compute_field_range(data, _compute_range_across_frames)
[docs]def get_simulation_frame(run_dir, data, model_data):
if re.search(r'bunchAnimation', data['modelName']):
return _extract_bunch_animation(run_dir, data, model_data)
assert False, 'invalid animation frame model: {}'.format(data['modelName'])
[docs]def lib_files(data, source_lib):
return []
[docs]def parse_error_log(run_dir):
return None
[docs]def python_source_for_model(data, model):
return _generate_parameters_file(data)
[docs]def prepare_output_file(report_info, data):
report = data['report']
if 'bunchReport' in report or 'twissReport' in report or 'opticsReport' in report:
fn = simulation_db.json_filename(template_common.OUTPUT_BASE_NAME, report_info.run_dir)
if fn.exists():
fn.remove()
save_report_data(data, report_info.run_dir)
[docs]def read_data_file(path):
text = pkio.read_text(path)
# mode: title -> header -> data
mode = 'title'
col_names = []
rows = []
for line in text.split("\n"):
if mode == 'title':
if not re.search(r'^\@', line):
mode = 'header'
continue
# work-around odd header/value "! optimp.f"
line = re.sub(r'\!\s', '', line)
if mode == 'header':
# header row starts with '# <letter>'
if re.search(r'^#\s+[a-zA-Z]', line):
col_names = re.split('\s+', line)
col_names = map(lambda x: re.sub(r'\W|_', '', x), col_names[1:])
mode = 'data'
elif mode == 'data':
if re.search('^#', line):
continue
row = re.split('\s+', re.sub(r'^\s+', '', line))
rows.append(row)
rows.pop()
return col_names, rows
[docs]def remove_last_frame(run_dir):
pass
[docs]def save_report_data(data, run_dir):
report_name = data['report']
if 'twissReport' in report_name or 'opticsReport' in report_name:
enum_name = _REPORT_ENUM_INFO[report_name]
report = data['models'][report_name]
plots = []
col_names, rows = read_data_file(py.path.local(run_dir).join(_ZGOUBI_TWISS_FILE))
for f in ('y1', 'y2', 'y3'):
if report[f] == 'none':
continue
plots.append({
'points': column_data(report[f], col_names, rows),
'label': template_common.enum_text(_SCHEMA, enum_name, report[f]),
})
#TODO(pjm): use template_common
x = column_data('sums', col_names, rows)
res = {
'title': '',
'x_range': [min(x), max(x)],
'y_label': '',
'x_label': 's [m]',
'x_points': x,
'plots': plots,
'y_range': template_common.compute_plot_color_and_range(plots),
'summaryData': _read_twiss_header(run_dir),
}
elif report_name == 'twissSummaryReport':
res = {
#TODO(pjm): x_range requied by sirepo-plotting.js
'x_range': [],
'summaryData': _read_twiss_header(run_dir),
}
elif 'bunchReport' in report_name:
report = data['models'][report_name]
col_names, rows = read_data_file(py.path.local(run_dir).join(_ZGOUBI_DATA_FILE))
res = _extract_bunch_data(report, col_names, rows, '')
summary_file = py.path.local(run_dir).join(BUNCH_SUMMARY_FILE)
if summary_file.exists():
res['summaryData'] = {
'bunch': simulation_db.read_json(summary_file)
}
else:
raise RuntimeError('unknown report: {}'.format(report_name))
simulation_db.write_result(
res,
run_dir=run_dir,
)
[docs]def simulation_dir_name(report_name):
if 'bunchReport' in report_name:
return 'bunchReport'
if 'opticsReport' in report_name or report_name == 'twissSummaryReport':
return 'twissReport2'
return report_name
[docs]def write_parameters(data, run_dir, is_parallel, python_file=template_common.PARAMETERS_PYTHON_FILE):
pkio.write_text(
run_dir.join(python_file),
_generate_parameters_file(data),
)
def _compute_range_across_frames(run_dir, data):
res = {}
for v in _SCHEMA.enum.PhaseSpaceCoordinate:
res[v[0]] = []
col_names, rows = read_data_file(py.path.local(run_dir).join(_ZGOUBI_DATA_FILE))
ipass_index = int(col_names.index('IPASS'))
for field in res:
values = column_data(field, col_names, rows)
values += column_data(_initial_phase_field(field), col_names, rows)
if len(res[field]):
res[field][0] = min(min(values), res[field][0])
res[field][1] = max(max(values), res[field][1])
else:
res[field] = [min(values), max(values)]
for field in res.keys():
factor = _PHASE_SPACE_FIELD_INFO[field][1]
res[field][0] *= factor
res[field][1] *= factor
res[_initial_phase_field(field)] = res[field]
return res
def _element_value(el, field):
converter = _MODEL_UNITS.get(el['type'], {}).get(field, None)
return converter(el[field]) if converter else el[field]
def _extract_bunch_animation(run_dir, data, model_data):
frame_index = int(data['frameIndex'])
report = template_common.parse_animation_args(
data,
{
'1': ['x', 'y', 'histogramBins', 'startTime'],
'': ['x', 'y', 'histogramBins', 'plotRangeType', 'horizontalSize', 'horizontalOffset', 'verticalSize', 'verticalOffset', 'isRunning', 'startTime'],
},
)
is_frame_0 = False
# remap frame 0 to use initial "o" values from frame 1
if frame_index == 0:
is_frame_0 = True
for f in ('x', 'y'):
v = report[f]
report[f] = _initial_phase_field(v)
frame_index = 1
col_names, all_rows = read_data_file(run_dir.join(_ZGOUBI_DATA_FILE))
rows = []
ipass_index = int(col_names.index('IPASS'))
for row in all_rows:
if int(row[ipass_index]) == frame_index:
rows.append(row)
model = model_data.models.bunchAnimation
model.update(report)
return _extract_bunch_data(model, col_names, rows, 'Initial Distribution' if is_frame_0 else 'Pass {}'.format(frame_index))
def _extract_bunch_data(report, col_names, rows, title):
x_info = _PHASE_SPACE_FIELD_INFO[report['x']]
y_info = _PHASE_SPACE_FIELD_INFO[report['y']]
x = np.array(column_data(report['x'], col_names, rows)) * x_info[1]
y = np.array(column_data(report['y'], col_names, rows)) * y_info[1]
return template_common.heatmap([x, y], report, {
'x_label': x_info[0],
'y_label': y_info[0],
'title': title,
'z_label': 'Number of Particles',
});
def _generate_beamline(data, beamline_map, element_map, beamline_id):
res = ''
for item_id in beamline_map[beamline_id]['items']:
if item_id in beamline_map:
res += _generate_beamline(data, beamline_map, element_map, item_id)
continue
el = element_map[item_id]
if el['type'] == 'AUTOREF':
res += 'line.add(core.FAKE_ELEM(""" \'AUTOREF\'\n{}\n{} {} {}\n"""))\n'.format(
el.I, _element_value(el, 'XCE'), _element_value(el, 'YCE'), _element_value(el, 'ALE'))
elif el['type'] == 'YMY':
res += 'line.add(core.FAKE_ELEM(""" \'YMY\'\n"""))\n'
elif el['type'] == 'SEXTUPOL':
res += _generate_element(el, 'QUADRUPO')
else:
assert el['type'] in _MODEL_UNITS, 'Unsupported element: {}'.format(el['type'])
res += _generate_element(el)
return res
def _generate_beamline_elements(report, data):
res = ''
sim = data['models']['simulation']
beamline_map = {}
for bl in data.models.beamlines:
beamline_map[bl.id] = bl
element_map = {}
for el in data.models.elements:
element_map[el._id] = el
if report == 'twissReport':
beamline_id = sim['activeBeamlineId']
else:
if 'visualizationBeamlineId' not in sim or not sim['visualizationBeamlineId']:
sim['visualizationBeamlineId'] = data.models.beamlines[0].id
beamline_id = sim['visualizationBeamlineId']
return _generate_beamline(data, beamline_map, element_map, beamline_id)
def _generate_element(el, schema_type=None):
res = 'line.add(core.{}("{}"'.format(el.type, el.name)
for f in _SCHEMA.model[schema_type or el.type]:
if f == 'name' or f == 'order' or f == 'format':
continue
#TODO(pjm): need ignore list
if el.type == 'CAVITE' and (f == 'l' or f == 'IOP' or f == 'f_RF'):
continue
res += ', {}={}'.format(_PYZGOUBI_FIELD_MAP.get(f, f), _element_value(el, f))
res += '))\n'
return res
def _generate_parameters_file(data):
v = template_common.flatten_data(data.models, {})
report = data.report if 'report' in data else ''
v['particleDef'] = _generate_particle(data.models.particle)
v['beamlineElements'] = _generate_beamline_elements(report, data)
res = template_common.render_jinja(SIM_TYPE, v, 'base.py')
if 'twissReport' in report or 'opticsReport' in report or report == 'twissSummaryReport':
return res + template_common.render_jinja(SIM_TYPE, v, 'twiss.py')
# if 'opticsReport' in report:
# return res + template_common.render_jinja(SIM_TYPE, v, 'optics.py')
v['outputFile'] = _ZGOUBI_DATA_FILE
res += template_common.render_jinja(SIM_TYPE, v, 'bunch.py')
if 'bunchReport' in report:
return res + template_common.render_jinja(SIM_TYPE, v, 'bunch-report.py')
return res + template_common.render_jinja(SIM_TYPE, v)
def _generate_particle(particle):
if particle.particleType == 'Other':
return '{} {} {} {} 0'.format(particle.M, particle.Q, particle.G, particle.Tau)
return particle.particleType
def _init_model_units():
# Convert element units (m, rad) to the required zgoubi units (cm, mrad, degrees)
def _cm(meters):
return float(meters) * 100
def _degrees(radians):
return float(radians) * 180 / math.pi
def _marker_plot(v):
return '"{}"'.format('.plt' if int(v) else '')
def _mrad(mrad):
return float(mrad) * 1000
def _xpas(v):
if re.search(r'^#', v):
v = re.sub(r'^#', '', v)
return '[{}]'.format(','.join(v.split('|')))
return str(_cm(float(v)))
return {
'AUTOREF': {
'XCE': _cm,
'YCE': _cm,
'ALE': _mrad,
},
'BEND': {
'l': _cm,
'X_E': _cm,
'LAM_E': _cm,
'X_S': _cm,
'LAM_S': _cm,
'XPAS': _xpas,
'XCE': _cm,
'YCE': _cm,
},
'CAVITE': {
},
'CHANGREF': {
'ALE': _degrees,
'XCE': _cm,
'YCE': _cm,
},
'DRIFT': {
'l': _cm,
},
'MARKER': {
'plt': _marker_plot,
},
'MULTIPOL': {
'l': _cm,
'R_0': _cm,
'X_E': _cm,
'LAM_E': _cm,
'X_S': _cm,
'LAM_S': _cm,
'XPAS': _xpas,
'XCE': _cm,
'YCE': _cm,
},
'QUADRUPO': {
'l': _cm,
'R_0': _cm,
'X_E': _cm,
'LAM_E': _cm,
'X_S': _cm,
'XPAS': _xpas,
'LAM_S': _cm,
'XCE': _cm,
'YCE': _cm,
},
'SEXTUPOL': {
'l': _cm,
'R_0': _cm,
'X_E': _cm,
'LAM_E': _cm,
'X_S': _cm,
'XPAS': _xpas,
'LAM_S': _cm,
'XCE': _cm,
'YCE': _cm,
},
}
def _initial_phase_field(field):
return _INITIAL_PHASE_MAP.get(field, '{}o'.format(field))
def _parse_zgoubi_log(run_dir):
path = run_dir.join(_ZGOUBI_LOG_FILE)
if not path.exists():
return ''
res = ''
element_by_num = {}
text = pkio.read_text(str(path))
for line in text.split("\n"):
match = re.search(r'^ (\'\w+\'.*?)\s+(\d+)$', line)
if match:
element_by_num[match.group(2)] = match.group(1)
continue
if re.search('all particles lost', line):
res += '{}\n'.format(line)
continue
match = re.search(r'Enjob occured at element # (\d+)', line)
if match:
res += '{}\n'.format(line)
num = match.group(1)
if num in element_by_num:
res += ' element # {}: {}\n'.format(num, element_by_num[num])
return res
def _read_twiss_header(run_dir):
path = py.path.local(run_dir).join(_ZGOUBI_TWISS_FILE)
res = []
for line in pkio.read_text(path).split('\n'):
for var in line.split('@ '):
values = var.split()
if len(values) and values[0] in _TWISS_SUMMARY_LABELS:
v = values[2]
if re.search(r'[a-z]{2}', v, re.IGNORECASE):
pass
else:
v = float(v)
res.append([values[0], _TWISS_SUMMARY_LABELS[values[0]], v])
return res
_MODEL_UNITS = _init_model_units()