Source code for sirepo.sim_data.radia

# -*- coding: utf-8 -*-
"""simulation data operations

:copyright: Copyright (c) 2019 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp
import copy
import os.path
import re
import pykern.pkio
import sirepo.sim_data
import sirepo.util


[docs] class SimData(sirepo.sim_data.SimDataBase): ANALYSIS_ONLY_FIELDS = frozenset( ( "alpha", "bgColor", "color", "colorMap", "name", "notes", "scaling", ) )
[docs] @classmethod def prepare_import_file_args(cls, req): res = cls._prepare_import_file_name_args(req) if res.ext_lower != ".dat": raise sirepo.util.UserAlert(f"invalid file extension='{res.ext_lower}'") p = cls.lib_file_name_with_type( res.basename, cls.schema().constants.fileTypeRadiaDmp, ) if cls.lib_file_exists(p, qcall=req.qcall): raise sirepo.util.UserAlert( f"dump file='{res.basename}' already exists; import another file name" ) cls.lib_file_write(p, req.form_file.as_bytes(), qcall=req.qcall) # radia doesn't use import_file_arguments return res
@classmethod def _compute_job_fields(cls, data, r, compute_model): res = cls._non_analysis_fields(data, r) + [] return res @classmethod def _compute_model(cls, analysis_model, *args, **kwargs): if analysis_model == "fieldLineoutAnimation": return "fieldLineoutAnimation" elif analysis_model in ("solverAnimation", "reset"): return "solverAnimation" elif analysis_model == "optimizerAnimation": return "optimizerAnimation" return super(SimData, cls)._compute_model(analysis_model, *args, **kwargs) @classmethod def __dynamic_defaults(cls, data, model): """defaults that depend on the current data""" return PKDict() @classmethod def _fixup_box_to_cuboid(cls, model, field): if model.get(field) == "box": model[field] = "cuboid" @classmethod def _fixup_examples(cls, models): sim = models.simulation if not sim.get("exampleName"): sim.exampleName = sim.name if sim.name in ("Parameterized C-Bend Dipole",): sim.beamAxis = "x" sim.heightAxis = "z" sim.widthAxis = "y" if sim.name == "Wiggler": models.geometryReport.isSolvable = "0" if not len(models.fieldPaths.paths): models.fieldPaths.paths.append( PKDict( _super="fieldPath", begin=[0, -225, 0], end=[0, 225, 0], id=0, name="y axis", numPoints=101, type="line", ) ) @classmethod def _fixup_obj_types(cls, dm): if dm.get("box"): dm.cuboid = dm.box.copy() del dm["box"] for m in dm: for f in ( "magnetObjType", "poleObjType", "type", ): if not hasattr(dm[m], "get"): continue cls._fixup_box_to_cuboid(dm[m], f) for m in ( "axisPath", "circlePath", "fieldMapPath", "filePath", "linePath", "manualPath", ): if dm.get(m): dm[m].type = m for o in dm.geometryReport.objects: if o.get("model") and not o.get("type"): o.type = o.get("model") for f in ( "model", "type", ): cls._fixup_box_to_cuboid(o, f)
[docs] @classmethod def fixup_old_data(cls, data, qcall, **kwargs): import sirepo.util sch = cls.schema() def _delete_old_fields(model): for f in ("divisions",): if model.get(f): del model[f] def _fixup_array(model, model_type_field, model_field): for o in model.get(model_field, []): if model_type_field not in o: continue _fixup_number_string_fields(o[model_type_field], o) _fixup_array(o, model_type_field, model_field) def _fixup_boolean_fields(model_name, model): s_m = sch.model[model_name] for f in [ f for f in s_m if f in model and s_m[f][1] == "Boolean" and not model[f] ]: model[f] = "0" def _fixup_field_paths(paths): for p in paths: if not p.type.endswith("Path"): p.type = f"{p.type}Path" for f in ( "begin", "end", ): _fixup_number_string_field(p, f) if p.type == "fieldMapPath": if p.numPoints < sch.model.fieldMapPath.numPoints[4]: p.numPoints = sch.model.fieldMapPath.numPoints[4] if p.numPoints > sch.model.fieldMapPath.numPoints[5]: p.numPoints = sch.model.fieldMapPath.numPoints[5] def _fixup_number_string_field(model, field, to_type=float): if field not in model: return if isinstance(model[field], str): model[field] = sirepo.util.split_comma_delimited_string( model[field], to_type ) def _fixup_number_string_fields(model_name, model): if not model_name or not model: return s_m = sch.model.get(model_name) if not s_m: return for f in model: if f not in s_m: continue sf = s_m[f][1] if sf == "FloatArray": _fixup_number_string_field(model, f) continue if sf == "IntArray": _fixup_number_string_field(model, f, to_type=int) continue if sf.startswith("model."): _fixup_number_string_fields(sf.split(".")[-1], model[f]) _fixup_transforms(model[f]) sc = [x for x in s_m.get("_super", []) if x != "_" and x != "model"] if sc: _fixup_number_string_fields(sc[0], model) def _fixup_geom_objects(objects): for o in objects: o.name = re.sub(r"\s", "_", o.name) if o.get("points") is not None and not o.get("triangulationLevel"): o.triangulationLevel = 0.5 if not o.get("bevels"): o.bevels = [] for b in o.bevels: if not b.get("cutRemoval"): b["cutRemoval"] = "1" if not o.get("fillets"): o.fillets = [] if not o.get("materialFormula"): o.materialFormula = [0, 0, 0, 0, 0, 0] if not o.get("modifications"): o.modifications = o.bevels + o.fillets for m in o.modifications: if m.get("amountHoriz") is not None: m.type = "objectBevel" if m.get("radius") is not None: m.type = "objectFillet" for d in ("heightDir", "widthDir"): if d in m: del m[d] for f in ( "type", "model", ): _fixup_number_string_fields(o.get(f), o) # fix "orphan" fields for f in ( "center", "magnetization", "segments", "size", ): _fixup_number_string_field(o, f) _fixup_segmentation(o) _fixup_terminations(o) _fixup_transforms(o) _delete_old_fields(o) def _fixup_segmentation(model): if not model.get("segments"): model.segments = model.get("division", [1, 1, 1]) if not model.get("segmentation"): if model.get("type") == "cylinder": model.segmentation = "cyl" model.segmentationCylAxis = model.extrusionAxis model.segmentationCylPoint = model.center model.segmentationCylRadius = model.radius else: model.segmentation = "pln" if not model.get("segmentationCylAxis"): model.segmentationCylAxis = "z" if not model.get("segmentationCylPoint"): model.segmentationCylPoint = [0, 0, 0] if not model.get("segmentationCylRadius"): model.segmentationCylRadius = 5.0 if not model.get("segmentationCylRatio"): model.segmentationCylRatio = 1.0 if not model.get("segmentationCylUseObjectCenter"): model.segmentationCylUseObjectCenter = "0" def _fixup_terminations(model): for t in filter( lambda x: x, map(lambda x: x.get("object"), model.get("terminations", [])), ): _fixup_number_string_fields(t.get("type"), t) def _fixup_transforms(model): _fixup_array(model, "model", "transforms") for t in model.get("transforms", []): if "model" in t: t.type = t.model for c in t.get("transforms", []): for ct in ("rotate", "translate"): if c.get("model") == f"{ct}Clone": c.type = ct dm = data.models cls._init_models(dm, None, dynamic=lambda m: cls.__dynamic_defaults(data, m)) if dm.simulation.get("dmpImportFile"): dm.simulation.appMode = "imported" if dm.get("geometry"): dm.geometryReport = dm.geometry.copy() del dm["geometry"] if dm.get("solver"): dm.solverAnimation = dm.solver.copy() del dm["solver"] if "fieldPaths" not in dm: dm.fieldPaths = cls.update_model_defaults(PKDict(), "fieldPaths") if not dm.fieldPaths.get("paths"): dm.fieldPaths.paths = [] if dm.fieldPaths.get("path"): dm.fieldPaths.selectedPath = dm.fieldPaths.path del dm.fieldPaths["path"] if dm.simulation.get("isExample"): cls._fixup_examples(dm) dm.simulation.areObjectsUnlockable = dm.simulation.magnetType == "freehand" if dm.simulation.magnetType == "undulator": cls._fixup_undulator(dm) cls._fixup_obj_types(dm) _fixup_geom_objects(dm.geometryReport.objects) _fixup_field_paths(dm.fieldPaths.paths) for name in [name for name in dm if name in sch.model]: _delete_old_fields(dm[name]) _fixup_boolean_fields(name, dm[name]) _fixup_number_string_fields(name, dm[name]) _fixup_terminations(dm[name]) _fixup_transforms(dm[name]) cls._organize_example(data)
@classmethod def _fixup_undulator(cls, dm): import sirepo.util if not dm.simulation.get("heightAxis"): dm.simulation.heightAxis = "z" if not dm.simulation.get("coordinateSystem"): dm.simulation.coordinateSystem = "beam" if "hybridUndulator" in dm: dm.undulatorHybrid = copy.deepcopy(dm.hybridUndulator) del dm["hybridUndulator"] dm.simulation.undulatorType = "undulatorHybrid" dm.undulatorHybrid.undulatorType = "undulatorHybrid" if dm.undulatorHybrid._super == "undulator": dm.undulatorHybrid._super = "undulatorBasic" if dm.simulation.undulatorType == "undulatorBasic": return u = dm.undulatorHybrid g = dm.geometryReport for k, v in PKDict( halfPole="Half Pole", magnet="Magnet Block", pole="Pole", corePoleGroup="Magnet-Pole Pair", terminationGroup="Termination", octantGroup="Octant", ).items(): if k not in u: u[k] = sirepo.util.find_obj(g.objects, "name", v) if not u.get("terminations"): u.terminations = [PKDict() for _ in range(len(u.terminationGroup.members))] for i, t_id in enumerate(u.terminationGroup.members): t = u.terminations[i] cls.update_model_defaults(t, "termination") t.object = sirepo.util.find_obj(g.objects, "id", t_id)
[docs] @classmethod def sim_files_to_run_dir(cls, data, run_dir, post_init=False): try: super().sim_files_to_run_dir(data, run_dir) except Exception as e: if not pykern.pkio.exception_is_not_found(e): raise if post_init: raise e
@classmethod def _lib_file_basenames(cls, data): res = [] if "dmpImportFile" in data.models.simulation: res.append( f"{cls.schema().constants.fileTypeRadiaDmp}.{data.models.simulation.dmpImportFile}" ) if "fieldType" in data: res.append( cls.lib_file_name_with_model_field( "fieldPath", data.fieldType, data.name + "." + data.fileType ) ) for o in filter( lambda x: "pointsFile" in x, data.models.geometryReport.objects ): res.append( cls.lib_file_name_with_model_field( "extrudedPoints", "pointsFile", o.pointsFile ) ) for o in filter( lambda x: x.get("type") == "stl" and "file" in x, data.models.geometryReport.objects, ): res.append(cls.lib_file_name_with_model_field("stl", "file", o.file)) return res @classmethod def _sim_file_basenames(cls, data): # TODO(e-carlin): share filename with template return [ PKDict(basename="geometry.dat"), PKDict(basename="geometryReport.h5"), ]