Source code for sirepo.pkcli.test_http

# -*- coding: utf-8 -*-
"""async requests to server over http

:copyright: Copyright (c) 2020 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
from pykern import pkcompat
from pykern import pkconfig
from pykern import pkinspect
from pykern import pkjson
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdp, pkdexc, pkdlog, pkdformat
import asyncio
import contextlib
import copy
import inspect
import random
import re
import signal
import sirepo.const
import sirepo.pkcli.service
import sirepo.sim_data
import sirepo.util
import time
import tornado.httpclient
import tornado.ioloop
import tornado.locks


_CODES = PKDict(
    elegant=(
        PKDict(
            name="bunchComp - fourDipoleCSR",
            reports=(
                PKDict(report="bunchReport1", binary_data_file=False),
                PKDict(report="elementAnimation10-5", binary_data_file=True),
            ),
        ),
        PKDict(
            name="SPEAR3",
            reports=(
                PKDict(report="bunchReport2", binary_data_file=False),
                PKDict(report="elementAnimation62-20", binary_data_file=True),
            ),
        ),
    ),
    jspec=(
        PKDict(
            name="Booster Ring",
            reports=(
                PKDict(report="particleAnimation", binary_data_file=False),
                PKDict(report="rateCalculationReport", binary_data_file=False),
            ),
        ),
    ),
    srw=(
        PKDict(
            name="Tabulated Undulator Example",
            reports=(
                PKDict(report="intensityReport", binary_data_file=False),
                PKDict(report="trajectoryReport", binary_data_file=False),
                PKDict(report="multiElectronAnimation", binary_data_file=False),
                PKDict(report="powerDensityReport", binary_data_file=False),
                PKDict(report="sourceIntensityReport", binary_data_file=False),
            ),
        ),
        PKDict(
            name="Bending Magnet Radiation",
            reports=(
                PKDict(report="initialIntensityReport", binary_data_file=False),
                PKDict(report="intensityReport", binary_data_file=False),
                PKDict(report="powerDensityReport", binary_data_file=False),
                PKDict(report="sourceIntensityReport", binary_data_file=False),
                PKDict(report="trajectoryReport", binary_data_file=False),
            ),
        ),
    ),
)

cfg = None

_sims = []


[docs] def default_command(): async def _apps(): a = [] for c in await _clients(): for t in _CODES.keys(): a.append( await _App( sim_type=t, client=c.copy(), examples=copy.deepcopy(_CODES[t]), ).setup_sim_data() ) return a async def _clients(): return await asyncio.gather(*[_Client(u).login() for u in cfg.emails]) def _register_signal_handlers(main_task): def _s(*args): main_task.cancel() signal.signal(signal.SIGTERM, _s) signal.signal(signal.SIGINT, _s) async def _run(): s = [] try: s = await _sims_tasks() t = asyncio.gather(*s) _register_signal_handlers(t) await t except Exception as e: await _cancel_all_tasks(s) if isinstance(e, sirepo.const.ASYNC_CANCELED_ERROR): # Will only be canceled by a signal handler return pkdlog("error={} stack={} sims={}", e, pkdexc(), _sims) raise async def _sims_tasks(): s = [] for a in await _apps(): e = a.examples[random.randrange(len(a.examples))] for r in random.sample(e.reports, len(e.reports)): s.append( _Sim(a, e.name, r.report, r.binary_data_file).create_task(), ) await _pause_for_server() return s random.seed() tornado.ioloop.IOLoop.current().run_sync(_run)
class _App(PKDict): def __init__(self, **kwargs): super().__init__(**kwargs) self.client.app = self async def setup_sim_data(self): r = await self.client.post("/simulation-list", PKDict(), self) self._sim_db = PKDict() self._sid = PKDict([(x.name, x.simulationId) for x in r]) self.sim_data = sirepo.sim_data.get_class(self.sim_type) return self def get_sid(self, sim_name): return self._sid[sim_name] async def get_sim(self, sim_name): try: return self._sim_db[sim_name] except KeyError: self._sim_db[sim_name] = await self.client.get( "/simulation/{}/{}/0".format( self.sim_type, self.get_sid(sim_name), ), self, ) return self._sim_db[sim_name] def pkdebug_str(self): return pkdformat( "{}(sim_type={} email={} uid={})", self.__class__.__name__, self.sim_type, self.client.email, self.client.uid, ) async def _cancel_all_tasks(tasks): for t in tasks: await _pause_for_server() pkdlog("cancelling task={}", _task_id(t)) t.cancel() # We need a gather() after cancel() because there are awaits in the # finally blocks (ex await post('run-cancel)). We need return_exceptions # so the CanceledErrors aren't raised which would cancel the gather. await asyncio.gather(*tasks, return_exceptions=True) class _Client(PKDict): def __init__(self, email, **kwargs): super().__init__( email=email, uid=None, _headers=PKDict({"User-Agent": "test_http"}), **kwargs, ) tornado.httpclient.AsyncHTTPClient.configure(None, max_clients=1000) self._client = tornado.httpclient.AsyncHTTPClient() def copy(self): n = _Client(self.email) for k, v in self.items(): if k != "_client": n[k] = copy.deepcopy(v) return n async def get(self, uri, caller, expect_binary_body=False): uri = self._uri(uri) with self._timer(uri, caller): return self.parse_response( await self._client.fetch( uri, headers=self._headers, method="GET", **self._fetch_default_args(), ), expect_binary_body=expect_binary_body, ) async def login(self): r = await self.post("/simulation-list", PKDict(), self) assert r.srException.routeName == "missingCookies" r = await self.post("/simulation-list", PKDict(), self) assert r.srException.routeName == "login" r = await self.post("/auth-email-login", PKDict(email=self.email), self) t = sirepo.util.create_token( self.email, ) r = await self.post( self._uri("/auth-email-authorized/{}/{}".format(self.sim_type, t)), PKDict(token=t, email=self.email), self, ) assert r.state != "srException", "r={}".format(r) if r.authState.needCompleteRegistration: r = await self.post( "/auth-complete-registration", PKDict(displayName=self.email), self, ) self.uid = re.search( r'"uid": "(\w+)"', await self.get("/auth-state", self), ).group(1) return self def parse_response(self, resp, expect_binary_body=False): assert resp.code == 200, "resp={}".format(resp) if "Set-Cookie" in resp.headers: self._headers.Cookie = resp.headers["Set-Cookie"] if "json" in resp.headers["content-type"]: return pkjson.load_any(resp.body) try: b = pkcompat.from_bytes(resp.body) assert ( not expect_binary_body ), "expecting binary body resp={} body={}".format( resp, b[:1000], ) except UnicodeDecodeError: assert expect_binary_body, "unexpected binary body resp={}".format(resp) # Binary data files can't be decoded return if "html" in resp.headers["content-type"]: m = re.search('location = "(/[^"]+)', b) if m: if "error" in m.group(1): return PKDict(state="error", error="server error") return PKDict(state="redirect", uri=m.group(1)) return b def pkdebug_str(self): return pkdformat( "{}(email={} uid={})", self.__class__.__name__, self.email, self.uid, ) async def post(self, uri, data, caller): data.simulationType = self.sim_type uri = self._uri(uri) with self._timer(uri, caller): return self.parse_response( await self._client.fetch( uri, body=pkjson.dump_bytes(data), headers=self._headers.pksetdefault( "Content-type", pkjson.MIME_TYPE, ), method="POST", **self._fetch_default_args(), ), ) def _fetch_default_args(self): return PKDict( connect_timeout=1e8, request_timeout=1e8, validate_cert=cfg.validate_cert, ) @contextlib.contextmanager def _timer(self, uri, caller): s = time.time() yield if "run-status" not in uri: pkdlog( "{} {} elapsed_time={:.6}", uri, caller.pkdebug_str(), time.time() - s, ) @property def sim_type(self): try: return self.app.sim_type except AttributeError: # We don't have an app so the sim type doesn't matter # just used for login() return "elegant" def _uri(self, uri): if uri.startswith("http"): return uri assert uri.startswith("/") # Elegant frame_id's sometimes have spaces in them so need to # make them url safe. But, the * in the url should not be made # url safe return cfg.server_uri + uri.replace(" ", "%20") class _Sim(PKDict): def __init__(self, app, sim_name, report, binary_data_file, **kwargs): super().__init__( _app=app, _sim_name=sim_name, _report=report, _binary_data_file=binary_data_file, **kwargs, ) self._sid = self._app.get_sid(self._sim_name) @contextlib.contextmanager def _set_waiting_on_status(self, frame_before_caller=False): f = inspect.currentframe().f_back.f_back if frame_before_caller: f = getattr(f, "f_back") c = pkinspect.Call(f) self._waiting_on = { k: c[k] for k in sorted(c.keys()) if k in ("lineno", "name") } yield # Only clear _waiting_on in the successful case. In failure # leave it for debugging self._waiting_on = None def create_task(self): async def _run(): _sims.append(self) # Must be set here once we are in the _run() task self._task_id = _task_id(asyncio.current_task()) with self._set_waiting_on_status(): self._data = await self._app.get_sim(self._sim_name) try: r = await self._run_sim_until_completion() if self._app.sim_data.is_parallel(self._report): g = await self._get_sim_frame(r) e = False c = None try: with self._set_waiting_on_status(): c = True await self._run_sim() c = False with self._set_waiting_on_status(): f = await self._app.client.get( "/simulation-frame/" + g, self ) assert ( f.state == "error" ), "{} expecting error instead of frame={}".format( self.pkdebug_str(), f, ) except Exception: e = True raise finally: if c: await self._cancel(error=e) except sirepo.const.ASYNC_CANCELED_ERROR: # Don't log on cancel error, we initiate cancels so not interesting raise except Exception as e: # The error will be logged 2x (once here and once at the top level). # This is not ideal but we need the context we have here which # we don't have at the top level pkdlog("{} error={} stack={}", self, e, pkdexc()) raise # TODO(e-carlin): in py3.8 set the task_name to be pkdebug_str() return asyncio.create_task(_run()) def pkdebug_str(self): return pkdformat( "{}(email={} sim_type={} computeJid={} task={} waiting_on={})", self.__class__.__name__, self._app.client.email, self._app.sim_type, self._app.sim_data.parse_jid( PKDict(simulationId=self._sid, report=self._report), uid=self._app.client.uid, ), self.get("_task_id", "<unknown>"), self.get("_waiting_on", "<unknown>"), ) async def _cancel(self, error=False): c = self._app.client.post( "/run-cancel", PKDict( report=self._report, models=self._data.models, simulationId=self._sid, simulationType=self._app.sim_type, ), self, ) if error: await c return with self._set_waiting_on_status(frame_before_caller=True): await c async def _get_sim_frame(self, next_request): g = self._app.sim_data.frame_id(self._data, next_request, self._report, 0) with self._set_waiting_on_status(): f = await self._app.client.get("/simulation-frame/" + g, self) assert ( f.state == "completed" ), f"{self.pkdebug_str()} expected state completed frame={f}" assert "title" in f, "{} no title in frame={}".format(self.pkdebug_str(), f) with self._set_waiting_on_status(): await self._app.client.get( "/download-data-file/{}/{}/{}/{}".format( self._app.sim_type, self._sid, self._report, 0, ), self, expect_binary_body=self._binary_data_file, ) return g async def _run_sim(self): r = self._report if "animation" in self._report.lower() and self._app.sim_type != "srw": r = "animation" return await self._app.client.post( "/run-simulation", PKDict( # works for sequential simulations, too forceRun=True, models=self._data.models, report=r, simulationId=self._sid, simulationType=self._app.sim_type, ), self, ) async def _run_sim_until_completion(self): c = True e = False try: with self._set_waiting_on_status(): r = await self._run_sim() t = random.randrange(cfg.run_min_secs, cfg.run_max_secs) for _ in range(t): if r.state == "completed" or r.state == "error": c = False assert r.state != "error", "{} unexpected error state {}".format( self.pkdebug_str(), r, ) break assert ( "nextRequest" in r ), '{} expected "nextRequest" in response={}'.format( self.pkdebug_str(), r, ) with self._set_waiting_on_status(): r = await self._app.client.post("/run-status", r.nextRequest, self) with self._set_waiting_on_status(): await tornado.gen.sleep(1) else: pkdlog("{} timeout={}", self, t) return r except Exception: e = True raise finally: if c: await self._cancel(e) def _init(): global cfg if cfg: return c = sirepo.pkcli.service._cfg() cfg = pkconfig.init( emails=( ["one@radia.run", "two@radia.run", "three@radia.run"], list, "emails to test", ), server_uri=( "http://{}:{}".format(c.ip, c.port), str, "where to send requests", ), run_min_secs=( 90, pkconfig.parse_seconds, "minimum amount of time to let a simulation run", ), run_max_secs=( 120, pkconfig.parse_seconds, "maximum amount of time to let a simulation run", ), validate_cert=( not pkconfig.channel_in("dev"), bool, "whether or not to validate server tls cert", ), ) async def _pause_for_server(): # Sleep a bit to give the server time to respond to requests. Without # it connections were being abruptly closed await tornado.gen.sleep(1) def _task_id(task): return str(id(task))[-4:] _init()