Source code for sirepo.reply

"""Reply hold the response to API calls.

Replies are independent of the web platform (tornado http or websocket). They
are converted to the native format by the platform dispatcher at the
time. Internal call_api returns an `_SReply` object.

:copyright: Copyright (c) 2023 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html

"""

from pykern import pkcompat
from pykern import pkconfig
from pykern import pkconst
from pykern import pkio
from pykern import pkjinja
from pykern import pkjson
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp, pkdformat
import email.utils
import mimetypes
import pykern.pkinspect
import re
import sirepo.const
import sirepo.feature_config
import sirepo.html
import sirepo.resource
import sirepo.uri
import sirepo.util

#: data.state for srException
SR_EXCEPTION_STATE = "srException"

SERVER_ERROR_ROUTE = "error"

#: mapping of extension (json, js, html) to MIME type
_MIME_TYPE = None

_MIME_TYPE_UTF8 = None

#: default Max-Age header
CACHE_MAX_AGE = 43200

ERROR_STATE = "error"

STATE = "state"

#: Default response
_RESPONSE_OK = PKDict({STATE: "ok"})

_DISPOSITION = "Content-Disposition"


[docs] def init_module(**imports): global _MIME_TYPE, _MIME_TYPE_UTF8 if _MIME_TYPE: return # import simulation_db sirepo.util.setattr_imports(imports) _MIME_TYPE = PKDict( html="text/html", ipynb="application/x-ipynb+json", js="application/javascript", json=pkjson.MIME_TYPE, jsonld="application/ld+json", madx="text/plain", py="text/x-python", rtf="application/rtf", txt="text/plain", ) _MIME_TYPE_UTF8 = frozenset(_MIME_TYPE.values())
[docs] def init_quest(qcall): _SReply(qcall=qcall)
class _SReply(sirepo.quest.Attr): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._cookies_to_delete = tuple() def content_as_object(self): return self._content_as(_Object).value def content_as_redirect(self): return self._content_as(_Redirect).value def content_as_str(self): return self._content_as(str) def cookie_set(self, cookie): self.__attrs.cookie = cookie return self def delete_third_party_cookies(self, values): """Remove `values` on client on reply Used exclusively by `jupyterhublogin` to delete Jupyter Hub cookies. Args: values (iterable): elements are PKDict(key, path) """ v = tuple(values) if ["key", "path"] != sorted(v[0].keys()): raise AssertionError(f"must be PKDict(key, path) entries in values={v}") if self._cookies_to_delete: raise AssertionError( f"existing _cookies_to_delete={self._cookies_to_delete} values={v}" ) self._cookies_to_delete = v def destroy(self, **kwargs): """Must be called""" try: try: a = self.__attrs except AttributeError: return if (c := a.get("content")) and isinstance(c, _File): h = c.pkdel("handle") if h: raise AssertionError("") h.close() except Exception as e: pkdlog("error={} reply={} stack={}", e, self, pkdexc()) def from_kwargs(self, **kwargs): """Saves reply attributes While replies are global (qcall.sreply), the attributes need to be reset every time a new reply is generated. """ self.destroy() self.__attrs = PKDict(kwargs).pksetdefault(headers=PKDict) return self def gen_exception(self, exc): """Generate from an Exception Args: exc (Exception): valid convert into a response """ try: if isinstance(exc, sirepo.util.ReplyExc): return self._gen_exception_reply(exc) return self._gen_exception_error(exc) except Exception as e: pkdlog("exception={} trying to generate exc={} stack={}", e, exc, pkdexc()) return self._gen_exception_reply_SRException( PKDict(routeName=SERVER_ERROR_ROUTE) ) def gen_file(self, path, filename): # Always (re-)initialize __attrs self.from_kwargs() try: e = None self.__attrs.content_type, e = self._guess_content_type(path.basename) self._download_name(filename or path.basename) self.__attrs.content = _File( encoding=e, length=path.size(), mtime=int(path.mtime()), path=path, ) # Need a handle, because path may get deleted before response. # Here to avoid unclosed handles on exceptions. self.__attrs.content.handle = open(path, "rb") return self except Exception: self.__attrs.pkdel("content") self.__attrs.pkdel("content_type") self.__attrs.pkdel("download_name") raise def gen_attachment(self, content_or_path, filename=None): """Generate an attachment from file or content Args: content_or_path (bytes or py.path): File contents filename (str): Name of file [content_or_path.basename] Returns: _SReply: reply object """ def _reply(filename): if isinstance(content_or_path, pkconst.PY_PATH_LOCAL_TYPE): return self.gen_file(path=content_or_path, filename=filename) self.from_kwargs( content=content_or_path, content_type=self._guess_content_type(filename)[0], ) self._download_name(filename) return self return _reply(filename)._disposition("attachment").headers_for_no_cache() def gen_dict(self, value): """Generate dict response Args: value (dict): response content Returns: _SReply: reply object """ assert isinstance(value, dict), f"value type={type(value)} is not dict" if value.get(STATE) == SR_EXCEPTION_STATE: # job_api calls return a dict for an srException from job_agent # so need convert to back to srException object. value[SR_EXCEPTION_STATE].pksetdefault( sim_type=lambda: self.qcall.sim_type_uget(), ) return self._gen_exception_reply_SRException(value[SR_EXCEPTION_STATE]) return self.from_kwargs(content=_Object(value)) def gen_dict_error(self, error): """Generate state=error dict response Args: error (object): bound to `ERROR_STATE` Returns: _SReply: reply object """ return self.gen_dict(PKDict({STATE: ERROR_STATE, ERROR_STATE: error})) def gen_dict_ok(self, value): """Generate state=ok dict response Args: value (dict): other values to set (NOTE: updated without copying) Returns: _SReply: reply object """ if value is None: return self.gen_dict(_RESPONSE_OK) value.update(_RESPONSE_OK) return self.gen_dict(value) def gen_list_deprecated(self, value): """Generate list response DEPRECATED: always should reply_dict Args: value (list): response content Returns: _SReply: reply object """ assert isinstance(value, list), f"value type={type(value)} is not list" return self.from_kwargs(content=_Object(value)) def gen_redirect(self, uri): """Redirect to uri Args: uri (str): any valid uri (even with anchor) Returns: _SReply: reply object """ return self._gen_redirect_for_anchor(uri) def gen_redirect_for_local_route( self, sim_type=None, route=None, params=None, query=None, **kwargs, ): """Generate a javascript redirect to sim_type/route/params Default route (None) only supported for ``default`` application_mode/appMode. Args: sim_type (str): how to find the schema [qcall.sim_type] route (str): name in localRoutes [None: use default route] params (dict): parameters for route (including :Name) Returns: _SReply: reply object """ return self._gen_redirect_for_anchor( sirepo.uri.local_route( self.qcall.sim_type_uget(sim_type), route, params, query ), **kwargs, ) def header_set(self, name, value): self.__attrs.headers[name] = value return self def headers_for_cache(self, path=None): self.__attrs.cache = PKDict(cache=True, mtime=path and path.mtime()) return self def headers_for_no_cache(self): self.__attrs.cache = PKDict(cache=False) return self def pkdebug_str(self): n = self.__class__.__name__ if not (a := self.get("__attrs")): return n + "()" c = a.get("content") return pkdformat( "{}(content={} content_type={})", n, # TODO(robnagler) more info by using pkdebug_str for content c if isinstance(c, str) else ("<" + str(type(c)) + ">"), a.get("content_type"), ) def render_html(self, path, want_cache=True, attrs=None): """Call sirepo.html.render with path Args: path (py.path): sirepo.html file to render want_cache (bool): whether to cache result kwargs (dict): params to p Returns: _SReply: reply """ r = self.from_kwargs( content=sirepo.html.render(path), content_type=_MIME_TYPE.html, **(attrs or PKDict()), ) return ( r.headers_for_cache(path=path) if want_cache else r.headers_for_no_cache() ) def render_static_jinja(self, base, ext, j2_ctx): """Render static template with jinja Args: base (str): base name of file, e.g. ``user-state`` ext (str): suffix of file, e.g. ``js`` j2_ctx (dict): jinja context cache_ok (bool): OK to cache the result? [default: False] Returns: _SReply: reply """ p = sirepo.resource.static(ext, f"{base}.{ext}") r = self.from_kwargs( content=pkjinja.render_file(p, j2_ctx, strict_undefined=True), content_type=_MIME_TYPE[ext], ) return r.headers_for_no_cache() def status_set(self, status): self.__attrs.status = int(status) return self def tornado_response(self, handler): def _bytes(resp, content): if isinstance(content, _Base): content, self.__attrs.content_type = content.http_response(self) c = pkcompat.to_bytes(content) resp.write(c) resp.set_header("Content-Length", str(len(c))) def _cache_control(resp): if "cache" not in self.__attrs: return resp c = self.__attrs.cache if c.cache: resp.set_header( "Cache-Control", f"private, max-age={CACHE_MAX_AGE}", ) if c.mtime is not None: resp.set_header( "Last-Modified", email.utils.formatdate(c.mtime, usegmt=True), ) else: resp.set_header("Cache-Control", "no-cache, no-store, must-revalidate") resp.set_header("Pragma", "no-cache") def _content_type(resp): c = a.content_type if self._mime_type_is_utf8(c): c += '; charset="utf8"' resp.set_header("Content-Type", c) def _cookie(resp): a = self.__attrs if not ((c := a.get("cookie")) and 200 <= a.status < 400): return for h in c.http_header_values(to_delete=self._cookies_to_delete): resp.add_header("Set-Cookie", h) def _file(resp): a = self.__attrs c = a.content resp.write(c.handle.read()) c.handle.close() c.pkdel("handle") if _DISPOSITION not in a.headers: self._disposition("inline") resp.set_header( "Last-Modified", email.utils.formatdate(c.mtime, usegmt=True), ) if c.encoding: resp.set_header("Content-Encoding", c.encoding) resp.set_header("Content-Length", str(c.length)) a = self.__attrs c = a.get("content") if c is None: c = b"" a.content_type = _MIME_TYPE.txt r = handler if isinstance(c, _File): _file(r) else: _bytes(r, c) a.pksetdefault(status=200) r.set_status(a.status) for k, v in a.headers.items(): r.set_header(k, v) _content_type(r) _cache_control(r) _cookie(r) return r def uri_router_process_api_call(self, res): """Process the reply from an API call by uri_router Destructively copies `res` to `self` if res is an`_SReply` """ if isinstance(res, _SReply): return self._copy(res) if isinstance(res, dict): return self.gen_dict(res) raise AssertionError(f"invalid return type={type(res)} from qcall={self.qcall}") async def websocket_response(self, wsreq): import msgpack def _content(): a = self.__attrs c = a.get("content") k = sirepo.const.SCHEMA_COMMON.websocketMsg.kind.httpReply if c is None: # always have content, easier for clients return "", k elif isinstance(c, _File): # TODO(robnagler) would be ideal to handle this differently, but # it may not be possible. Tornado/msgpack has lots of copying. x = c.handle.read() c.handle.close() c.pkdel("handle") if self._mime_type_is_utf8(a.content_type): x = pkcompat.from_bytes(x) return x, k elif isinstance(c, _Base): return c.websocket_content() else: return c, k async def _reply(content, kind): await _send( PKDict( kind=kind, reqSeq=wsreq.req_seq, ), content, ) async def _send(header, content): p = None try: p = msgpack.Packer(autoreset=False) header.version = sirepo.const.SCHEMA_COMMON.websocketMsg.version p.pack(header) p.pack(content) # TODO(robnagler) getbuffer() would be better await wsreq.handler.write_message(p.bytes(), binary=True) finally: if p: p.reset() async def _send_cookie(): if not (c := self.__attrs.get("cookie")): return await _send( PKDict( kind=sirepo.const.SCHEMA_COMMON.websocketMsg.kind.asyncMsg, method=sirepo.const.SCHEMA_COMMON.websocketMsg.method.setCookies, ), c.http_header_values(to_delete=self._cookies_to_delete), ) try: # The cookie reply is sent first. This is not atomic, but it's fine. # Reverse order is not good, since content may be a redirect. await _send_cookie() await _reply(*_content()) except Exception as e: pkdlog("error={} reply={} stack={}", e, self, pkdexc()) await _reply( PKDict(routeName=SERVER_ERROR_ROUTE, params=PKDict()), sirepo.const.SCHEMA_COMMON.websocketMsg.kind.srException, ) def _assert_no_cookie_ops(self): if self.get("_cookies_to_delete"): raise AssertionError(f"cannot delete cookies for nested request") if self.pkunchecked_nested_get(("__attrs", "cookie")): raise AssertionError(f"cannot add cookies for nested request") def _content_as(self, clazz): res = self.__attrs.get("content") if not isinstance(res, clazz): raise AssertionError(f"unexpected reply type={type(res)}") return res def _copy(self, source): """Destructive copy source unless `self` is `source`""" if source is self: return self self._assert_no_cookie_ops() res = self.from_kwargs(**source.__attrs) # Destructive so "handle" not used by caller source.__attrs = None return res def _disposition(self, disposition): self.header_set( _DISPOSITION, f'{disposition}; filename="{self.__attrs.download_name}"' ) return self def _download_name(self, filename): def _secure_filename(): return re.sub(r"[^\w\.]+", "-", filename).strip("-") f = _secure_filename() if f.startswith("."): # the safe filename has no basename, prefix with "download" f = "download" + f self.__attrs.download_name = f def _gen_exception_error(self, exc): pkdlog("unsupported exception={} msg={}", type(exc), exc) return self._gen_exception_reply_ServerError(None) def _gen_exception_base(self, exc): return self._gen_exception_reply(exc) def _gen_exception_reply(self, exc): f = getattr( self, "_gen_exception_reply_" + exc.__class__.__name__, None, ) pkdc("exception={} sr_args={}", exc, exc.sr_args) if not f: return self._gen_exception_error(exc) return f(exc.sr_args) def _gen_exception_reply_BadRequest(self, args): return self._gen_http_exception(400) def _gen_exception_reply_ContentTooLarge(self, args): return self._gen_http_exception(413) def _gen_exception_reply_Error(self, args): return self.from_kwargs(content=_Error(args)) def _gen_exception_reply_InvalidEmail(self, args): return self.from_kwargs(content=_Error(args)) def _gen_exception_reply_NotFound(self, args): return self._gen_http_exception(404) def _gen_exception_reply_PlanExpired(self, args): return self._gen_http_exception(402) def _gen_exception_reply_Redirect(self, args): return self.gen_redirect(args.uri) def _gen_exception_reply_SReplyExc(self, args): r = args.sreply if r is self: return self return self.from_kwargs(**r.__attrs) def _gen_exception_reply_ServerError(self, args): return self._gen_http_exception(500) def _gen_exception_reply_SPathNotFound(self, args): pkdlog("uncaught SPathNotFound {}", args) return self._gen_http_exception(404) def _gen_exception_reply_SRException(self, args): if args.get("params") is None: args.params = PKDict() args.pksetdefault(sim_type=lambda: self.qcall.sim_type_uget()) return self.from_kwargs(content=_SRException(args)) def _gen_exception_reply_Forbidden(self, args): return self._gen_http_exception(403) def _gen_exception_reply_Unauthorized(self, args): return self._gen_http_exception(401) def _gen_exception_reply_UserDirNotFound(self, args): return self.qcall.auth.user_dir_not_found(**args) def _gen_exception_reply_UserAlert(self, args): return self._gen_exception_reply_Error(args) def _gen_exception_reply_WWWAuthenticate(self, args): return self._gen_http_exception( 401, headers=PKDict({"WWW-Authenticate": 'Basic realm="*"'}) ) def _gen_http_exception(self, code, headers=None): return self.from_kwargs( content=_HTTPException(PKDict(code=code, headers=headers)) ) def _gen_redirect_for_anchor(self, uri, **kwargs): """Redirect uri with an anchor using javascript Safari browser doesn't support redirects with anchors so we do this in all cases. It also allows us to return sr_exception to the app when we don't know if we can. Args: uri (str): where to redirect to Returns: _SReply: reply object """ if self.qcall.sim_type_uget() in sirepo.feature_config.cfg().vue_sim_types: uri = uri.replace("#", "") return self.from_kwargs( content=_Redirect( PKDict( uri=uri, sim_type=self.qcall.sim_type_uget(), **kwargs, ), ), ) def _guess_content_type(self, basename): res = mimetypes.guess_type(basename) if res[0] is None: return "application/octet-stream", None # overrule mimetypes for this case elif res[0] == "text/x-python": return "text/plain", res[1] return res def _mime_type_is_utf8(self, content_type): return content_type in _MIME_TYPE_UTF8 or content_type.startswith("text/") class _Base: def __init__(self, value): self.value = value def _http_error(self, code, sreply, error_detail=None): x = simulation_db.SCHEMA_COMMON.customErrors.get(str(code)) j = PKDict(error_detail=error_detail) if x: try: # inject the error in the html without jinja or simply generate from a different template # with a title and the respective error. # check self.value.error or have it passed explicitly j.body = sirepo.html.render( path=sirepo.resource.static("html", x["url"]) ) except Exception as e: pkdlog( "customErrors code={} render error={} stack={}", code, e, pkdexc() ) if "body" not in j: j.body = f"<h1>HTTP Error {code}</h1><p>The server could not process your request</p>" sreply.status_set(code) return self._jinja_html("http-custom-error", j2_ctx=j, sreply=sreply) def _jinja_html(self, basename, j2_ctx, sreply): sreply.headers_for_no_cache() return ( pkjinja.render_file( sirepo.resource.file_path(basename + ".html.jinja"), j2_ctx=j2_ctx, strict_undefined=True, ), _MIME_TYPE.html, ) def _json(self, value): return pkjson.dump_pretty(value, pretty=False), _MIME_TYPE.json def _redirect_html(self, value, sreply): j = PKDict(redirect_uri=value.uri, **value) if "sr_exception" in j: j.sr_exception = pkjson.dump_pretty(j.sr_exception, pretty=False) return self._jinja_html("javascript-redirect", j2_ctx=j, sreply=sreply) def _sr_exception(self, routeName, params, **kwargs): # Only supply parameters that match the localRoute, and simulationType # is not in any localRoutes. It's added automatically to params in # certain cases. # TODO(robnagler) this probably should be an assert params.pkdel("simulationType") return ( PKDict(routeName=routeName, params=params), sirepo.const.SCHEMA_COMMON.websocketMsg.kind.srException, ) def _value(self, value=None): return ( self.value if value is None else value, sirepo.const.SCHEMA_COMMON.websocketMsg.kind.httpReply, ) class _File(PKDict): pass class _Error(_Base): def http_response(self, sreply): if sreply.qcall.sreq.method_is_post() and not sreply.qcall.sreq.is_spider(): return self._json(self.websocket_content()[0]) return self._http_error(500, sreply, self.value.error) def websocket_content(self): return self._value( value=PKDict({STATE: ERROR_STATE, ERROR_STATE: self.value.error}) ) class _HTTPException(_Base): def http_response(self, sreply): if self.value.headers: for k, v in self.value.headers.items(): sreply.header_set(k, v) return self._http_error(self.value.code, sreply) def websocket_content(self): return self._sr_exception( routeName="httpException", params=PKDict(code=self.value.code) ) class _Object(_Base): def http_response(self, sreply): return self._json(self.value) def websocket_content(self): return self._value() class _Redirect(_Base): def http_response(self, sreply): if not sreply.qcall.sreq.method_is_post(): return self._redirect_html(self.value, sreply) return self._json( PKDict( { STATE: SR_EXCEPTION_STATE, SR_EXCEPTION_STATE: PKDict( routeName="httpRedirect", params=PKDict(uri=self.value.uri), ), } ), ) def websocket_content(self): return self._sr_exception( routeName="httpRedirect", params=PKDict(uri=self.value.uri) ) class _SRException(_Base): def __init__(self, value): self.value = value def http_response(self, sreply): def _sim_type(): # Only supply parameters that match the localRoute, and simulationType # is not in localRoutes. x = self.value.params.pkdel("simulationType") y = self.value.pkdel("sim_type") return x or y t = _sim_type() r = self.value.routeName x = bool(r) and sirepo.uri.is_sr_exception_only(t, r) if x is None: pkdlog("localRoute={} not found in schema for type={}", r, t) return self._http_error(500, sreply) if x or sreply.qcall.sreq.method_is_post(): return self._json( PKDict({STATE: SR_EXCEPTION_STATE, SR_EXCEPTION_STATE: self.value}), ) pkdc("redirect to route={} params={} type={}", r, self.value.params, t) return self._redirect_html( PKDict( uri=sirepo.uri.local_route(t, r, self.value.params), sr_exception=self.value, ), sreply, ) def websocket_content(self): return self._sr_exception(**self.value)