Source code for sirepo.auth_role
"""User roles
:copyright: Copyright (c) 2021 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern import pkconfig
from pykern import pkinspect
from pykern import pkunit
from pykern.pkdebug import pkdp
import aenum
import sirepo.feature_config
ROLE_ADM = "adm"
ROLE_PLAN_BASIC = "basic"
ROLE_PLAN_ENTERPRISE = "enterprise"
ROLE_PLAN_PREMIUM = "premium"
ROLE_PLAN_TRIAL = "trial"
ROLE_USER = "user"
PLAN_ROLES_PAID = frozenset((ROLE_PLAN_BASIC, ROLE_PLAN_ENTERPRISE, ROLE_PLAN_PREMIUM))
PLAN_ROLES = PLAN_ROLES_PAID.union([ROLE_PLAN_TRIAL])
_SIM_TYPE_ROLE_PREFIX = "sim_type_"
_FOR_NEW_USER = frozenset((ROLE_USER,))
_ADM_SET = frozenset([ROLE_ADM])
[docs]
class ModerationStatus(aenum.NamedConstant):
"""States used by auth_role_moderation and UserRoleModeration"""
APPROVE = "approve"
CLARIFY = "clarify"
DENY = "deny"
PENDING = "pending"
VALID_SET = frozenset([APPROVE, CLARIFY, DENY, PENDING])
[docs]
@classmethod
def check(cls, value):
if value not in cls.VALID_SET:
raise AssertionError(
f"status={value} is not in valied_set={cls.VALID_SET}"
)
return value
[docs]
def check(role):
if role not in _all():
raise AssertionError(f"invalid role={role}")
return role
[docs]
def for_moderated_sim_types():
return _memoize(_for_sim_types("moderated_sim_types"))
[docs]
def for_new_user(auth_method):
from sirepo import auth
if pkconfig.in_dev_mode:
rv = _FOR_NEW_USER.union([ROLE_PLAN_PREMIUM]).union(
_for_sim_types("auth_controlled_sim_types")
)
if auth_method == auth.METHOD_GUEST:
return rv.union([ROLE_ADM])
if auth_method != auth.METHOD_EMAIL and pkunit.is_test_run():
return rv
rv = _FOR_NEW_USER
if not sirepo.feature_config.have_payments():
return rv.union([ROLE_PLAN_PREMIUM])
return rv
[docs]
def for_proprietary_oauth_sim_types():
return _memoize(_for_sim_types("proprietary_oauth_sim_types"))
[docs]
def for_sim_type(sim_type):
return check(_unchecked_for_sim_type(sim_type))
[docs]
def sim_type(role):
if not check(role).startswith(_SIM_TYPE_ROLE_PREFIX):
raise AssertionError(f"not a sim_type role={role}")
return role[len(_SIM_TYPE_ROLE_PREFIX) :]
def _all():
return _memoize(
_for_sim_types("auth_controlled_sim_types").union(
(
ROLE_ADM,
ROLE_PLAN_BASIC,
ROLE_PLAN_ENTERPRISE,
ROLE_PLAN_PREMIUM,
ROLE_PLAN_TRIAL,
ROLE_USER,
)
),
)
def _for_sim_types(attr):
# a bit goofy, but simplified above
if (x := sirepo.feature_config.cfg().get(attr)) is None:
x = getattr(sirepo.feature_config, attr)()
return frozenset(_unchecked_for_sim_type(s) for s in x)
def _memoize(value):
def wrap():
return value
setattr(pkinspect.this_module(), pkinspect.caller_func_name(), wrap)
return value
def _unchecked_for_sim_type(sim_type):
return _SIM_TYPE_ROLE_PREFIX + sim_type