Source code for sirepo.api_auth

# -*- coding: utf-8 -*-
"""authentication and authorization routines

:copyright: Copyright (c) 2018 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp
from pykern import pkcollections
from pykern import pkconfig
from pykern import pkinspect
import sirepo.api_perm
import sirepo.auth
import sirepo.util


[docs] def assert_api_def(func): try: assert isinstance(getattr(func, sirepo.api_perm.ATTR), sirepo.api_perm.APIPerm) except Exception as e: raise AssertionError( "function needs api_perm decoration: func={} err={}".format( func.__name__, e, ), )
[docs] def check_api_call(qcall, func): expect = getattr(func, sirepo.api_perm.ATTR) a = sirepo.api_perm.APIPerm if expect in ( a.ALLOW_SIM_TYPELESS_REQUIRE_EMAIL_USER, a.REQUIRE_COOKIE_SENTINEL, a.REQUIRE_USER, a.REQUIRE_PLAN, a.REQUIRE_ADM, a.REQUIRE_PREMIUM, ): if not qcall.cookie.has_sentinel(): raise sirepo.util.SRException("missingCookies", None) if expect == a.REQUIRE_PLAN: qcall.auth.require_plan() elif expect == a.REQUIRE_USER: qcall.auth.require_user() elif expect == a.ALLOW_SIM_TYPELESS_REQUIRE_EMAIL_USER: qcall.auth.require_email_user() elif expect == a.REQUIRE_ADM: qcall.auth.require_adm() elif expect == a.REQUIRE_PREMIUM: qcall.auth.require_premium() elif expect == a.ALLOW_VISITOR: pass elif expect == a.INTERNAL_TEST: if not pkconfig.channel_in_internal_test(): raise sirepo.util.Forbidden("Only available in internal test") elif expect in (a.ALLOW_COOKIELESS_SET_USER, a.ALLOW_COOKIELESS_REQUIRE_USER): qcall.cookie.set_sentinel() if expect == a.ALLOW_COOKIELESS_REQUIRE_USER: qcall.auth.require_user() elif expect == a.REQUIRE_AUTH_BASIC: qcall.auth.require_auth_basic() else: raise AssertionError("unhandled api_perm={}".format(expect))
[docs] def maybe_sim_type_required_for_api(func): return getattr(func, sirepo.api_perm.ATTR) not in sirepo.api_perm.SIM_TYPELESS_PERMS