Source code for sirepo.oauth

# -*- coding: utf-8 -*-
"""oauth for authentication and role moderation

:copyright: Copyright (c) 2022 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp
import authlib.integrations.base_client
import authlib.integrations.requests_client
import sirepo.feature_config
import sirepo.sim_oauth
import sirepo.uri_router
import sirepo.util


#: cookie keys for oauth (prefix is "sroa")
_COOKIE_NONCE = "sroan"
_COOKIE_SIM_TYPE = "sroas"


[docs] def check_authorized_callback(qcall): # clear temporary cookie values first s = qcall.cookie.unchecked_remove(_COOKIE_NONCE) t = qcall.cookie.unchecked_remove(_COOKIE_SIM_TYPE) assert t qcall.sim_type_set(t) c = _client(qcall, t) try: c.fetch_token( authorization_response=qcall.sreq.http_request_uri, state=s, # SECURITY: This *must* be the grant_type otherwise authlib defaults to # client_credentials which just returns details about the oauth client. That response # can easily be confused for a valid authorization_code response. grant_type="authorization_code", ) return c, t except Exception as e: pkdlog("url={} exception={} stack={}", qcall.sreq.http_request_uri, e, pkdexc()) raise sirepo.util.Forbidden(f"user denied access from sim_type={t}")
[docs] def raise_authorize_redirect(qcall, sim_type): qcall.cookie.set_value(_COOKIE_SIM_TYPE, sim_type) c = _cfg(sim_type) u, s = _client(qcall, sim_type).create_authorization_url(c.authorize_url) qcall.cookie.set_value(_COOKIE_NONCE, s) raise sirepo.util.Redirect(u)
def _cfg(sim_type): return sirepo.sim_oauth.import_module(sim_type).cfg def _client(qcall, sim_type): """Makes it easier to mock""" c = _cfg(sim_type) return authlib.integrations.requests_client.OAuth2Session( c.key, c.secret, redirect_uri=c.callback_uri or qcall.absolute_uri(sirepo.uri_router.uri_for_api(c.callback_api)), **c, )