Source code for sirepo.sim_api.jupyterhublogin

"""API's for jupyterhublogin sim

:copyright: Copyright (c) 2020 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""

from pykern import pkconfig, pkio
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdp, pkdlog, pkdexc
import re
import sirepo.api_perm
import sirepo.const
import sirepo.events
import sirepo.oauth
import sirepo.quest
import sirepo.srdb
import sirepo.uri
import sirepo.uri_router
import sirepo.util

_cfg = None

_HUB_USER_SEP = "-"

# "-" is considered invalid here, though it will be replaced
# with _HUB_USER_SEP
_INVALID_USER_NAME_CHARS = re.compile(r"[^a-z0-9]+")

_JUPYTERHUB_LOGOUT_USER_NAME_ATTR = "jupyterhub_logout_user_name"


[docs] class API(sirepo.quest.API): @sirepo.quest.Spec( "require_plan", sim_type=f"SimType const={sirepo.const.SIM_TYPE_JUPYTERHUBLOGIN}", ) async def api_checkAuthJupyterHub(self): # TODO(rorour) do this role check at a higher level # (see https://github.com/radiasoft/sirepo/issues/7026) self.parse_params(type=sirepo.const.SIM_TYPE_JUPYTERHUBLOGIN) u = _unchecked_jupyterhub_user_name( self, have_simulation_db=False, ) if not u: u = create_user(self) return self.reply_ok(PKDict(username=u)) @sirepo.quest.Spec( "require_plan", sim_type=f"SimType const={sirepo.const.SIM_TYPE_JUPYTERHUBLOGIN}", ) async def api_redirectJupyterHub(self): # TODO(rorour) do this role check at a higher level # (see https://github.com/radiasoft/sirepo/issues/7026) self.parse_params(type=sirepo.const.SIM_TYPE_JUPYTERHUBLOGIN) if not _unchecked_jupyterhub_user_name(self): create_user(self) return self.reply_redirect("jupyterHub")
[docs] def cfg(): return _init()
[docs] def create_user(qcall): """Create a Jupyter user if not one associated with Sirepo uid New user_name is created from sanitized Sirepo username. If user_name is taken, will be appended with random string. Returns: user_name (str): The user_name of the new or existing user """ def _name_sanitized(): n = _INVALID_USER_NAME_CHARS.sub( _HUB_USER_SEP, qcall.auth.logged_in_user_name_local_part() ).strip(_HUB_USER_SEP) if not len(n) > 0: raise AssertionError( f"sanitized Jupyter username is empty for uid={qcall.auth.logged_in_user()}" ) return n def _user_name(): n = _name_sanitized() if qcall.auth_db.model("JupyterhubUser").unchecked_search_by(user_name=n): # The username already exists. Add some randomness to try and create # a unique user name. n += _HUB_USER_SEP + sirepo.util.random_base62(3).lower() return n n = _unchecked_jupyterhub_user_name(qcall) if n: return n u = _user_name() # POSIT: if two creates happen simultaneously, there may be an existence # collision, but the db will be consistent, because this call happens # first, before db insert. if _user_dir(qcall, u).exists(): raise AssertionError(f"existing user dir with same name={_user_dir(qcall, u)}") qcall.auth_db.model( "JupyterhubUser", uid=qcall.auth.logged_in_user(), user_name=u, ).save() # POSIT: one transaction will rollback if two creates happen at the same time, # but that won't change the need for the directory. pkio.mkdir_parent(_user_dir(qcall)) return u
[docs] def delete_user_dir(qcall): n = _unchecked_jupyterhub_user_name(qcall, have_simulation_db=False) if not n: return pkio.unchecked_remove(_user_dir(qcall, user_name=n))
[docs] def init_apis(*args, **kwargs): _init() sirepo.events.register( PKDict( auth_logout=_event_auth_logout, end_api_call=_event_end_api_call, ) )
def _init(): global _cfg if _cfg: return _cfg _cfg = pkconfig.init( user_db_root_d=( pkio.py_path(sirepo.srdb.root()).join("jupyterhub", "user"), pkio.py_path, "Jupyterhub user db", ), uri_root=("jupyter", str, "the root uri of jupyterhub"), ) pkio.mkdir_parent(_cfg.user_db_root_d) return _cfg def _event_auth_logout(qcall, kwargs): # We must set the uid here because in _event_end_api_call we won't # be able to retrieve it. The user is already logged out by the # time _event_end_api_call is called qcall.bucket_set( _JUPYTERHUB_LOGOUT_USER_NAME_ATTR, _unchecked_hub_user(qcall, kwargs.uid) ) def _event_end_api_call(qcall, kwargs): # We can't move this code into _event_auth_logout because we need # the response object. u = qcall.bucket_unchecked_get(_JUPYTERHUB_LOGOUT_USER_NAME_ATTR) if not u: return # Delete the JupyterHub cookies because we are logging out of Sirepo. # Trailing slash is required in paths kwargs.resp.delete_third_party_cookies( ( PKDict(key="jupyterhub-hub-login", path=f"/{_cfg.uri_root}/hub/"), PKDict(key=f"jupyterhub-user-{u}", path=f"/{_cfg.uri_root}/user/{u}/"), ) ) def _unchecked_hub_user(qcall, uid): u = qcall.auth_db.model("JupyterhubUser").unchecked_search_by(uid=uid) if u: return u.user_name return None def _unchecked_jupyterhub_user_name(qcall, have_simulation_db=True): return _unchecked_hub_user( qcall, qcall.auth.logged_in_user(check_path=have_simulation_db) ) def _user_dir(qcall, user_name=None): if not user_name: user_name = _unchecked_jupyterhub_user_name(qcall) assert user_name, "must have user to get dir" return _cfg.user_db_root_d.join(user_name)