"""Authentication
:copyright: Copyright (c) 2018-2019 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from pykern import pkcollections
from pykern import pkcompat
from pykern import pkconfig
from pykern import pkinspect
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp, pkdexc
import contextlib
import importlib
import sirepo.auth_db
import sirepo.auth_role
import sirepo.cookie
import sirepo.events
import sirepo.feature_config
import sirepo.job
import sirepo.mpi
import sirepo.payments
import sirepo.quest
import sirepo.reply
import sirepo.request
import sirepo.template
import sirepo.uri
import sirepo.util
#: what routeName to return in the event user is logged out in require_user
LOGIN_ROUTE_NAME = "login"
#: Email is used by moderation. Do not use this var, use qcall.auth.METHOD_EMAIL
METHOD_EMAIL = "email"
#: Guest is a special method. Do not use this var, use qcall.auth.METHOD_GUEST
METHOD_GUEST = "guest"
#: key for auth method for login state
_COOKIE_METHOD = "sram"
#: There will always be this value in the cookie, if there is a cookie
_COOKIE_STATE = "sras"
#: Identifies the user in the cookie
_COOKIE_USER = "srau"
_GUEST_USER_DISPLAY_NAME = "Guest User"
_PAYMENT_PLAN_BASIC = sirepo.auth_role.ROLE_PLAN_BASIC
_PAYMENT_PLAN_ENTERPRISE = sirepo.auth_role.ROLE_PLAN_ENTERPRISE
_PAYMENT_PLAN_PREMIUM = sirepo.auth_role.ROLE_PLAN_PREMIUM
_ALL_PAYMENT_PLANS = (
_PAYMENT_PLAN_BASIC,
_PAYMENT_PLAN_ENTERPRISE,
_PAYMENT_PLAN_PREMIUM,
)
_STATE_LOGGED_IN = "li"
_STATE_LOGGED_OUT = "lo"
_STATE_COMPLETE_REGISTRATION = "cr"
#: name to module object
_METHOD_MODULES = PKDict()
# TODO(robnagler) probably from the schema
#: For formatting the size parameter to an avatar_uri
_AVATAR_SIZE = 40
#: methods + deprecated_methods
valid_methods = None
#: Methods that the user is allowed to see
visible_methods = None
#: visible_methods excluding guest
non_guest_methods = None
#: in auth state
_cookie_http_name = None
_cfg = None
[docs]
def init_quest(qcall, internal_req=None):
"""Under development
Args:
qcall (quest.API): context for APIs and CLIs
internal_req (object): context of web framework, pkcli, unit test, etc.
"""
o = _Auth(qcall)
sirepo.auth_db.init_quest(qcall)
if (
not _cfg.logged_in_user
and internal_req
or qcall.bucket_unchecked_get("in_pkcli")
):
sirepo.request.init_quest(qcall, internal_req=internal_req)
sirepo.reply.init_quest(qcall)
# TODO(robnagler): process auth basic header, too. this
# should not cookie but route to auth_basic.
sirepo.cookie.init_quest(qcall)
# TODO(robnagler) auth_db
o._set_log_user()
[docs]
def init_module(**imports):
global _cfg
def _init_full():
global visible_methods, valid_methods, non_guest_methods, _cookie_http_name
p = pkinspect.this_module().__name__
visible_methods = []
valid_methods = _cfg.methods.union(_cfg.deprecated_methods)
for n in valid_methods:
m = importlib.import_module(pkinspect.module_name_join((p, n)))
_METHOD_MODULES[n] = m
if m.AUTH_METHOD_VISIBLE and n in _cfg.methods:
visible_methods.append(n)
visible_methods = tuple(sorted(visible_methods))
non_guest_methods = tuple(m for m in visible_methods if m != METHOD_GUEST)
s = list(simulation_db.SCHEMA_COMMON.common.constants.paymentPlans.keys())
if sorted(s) != sorted(_ALL_PAYMENT_PLANS):
raise AssertionError(
f"payment plans from SCHEMA_COMMON={s} not equal to _ALL_PAYMENT_PLANS={_ALL_PAYMENT_PLANS}",
)
_cookie_http_name = sirepo.cookie.unchecked_http_name()
if _cfg:
return
# import simulation_db
sirepo.util.setattr_imports(imports)
_cfg = pkconfig.init(
methods=((METHOD_GUEST,), set, "for logging in"),
deprecated_methods=(set(), set, "for migrating to methods"),
logged_in_user=(None, str, "Only for sirepo.job_supervisor"),
)
if _cfg.logged_in_user:
_cfg.deprecated_methods = frozenset()
_cfg.methods = frozenset((METHOD_GUEST,))
else:
_init_full()
[docs]
def only_for_api_method_modules():
return list(_METHOD_MODULES.values())
class _Auth(sirepo.quest.Attr):
METHOD_EMAIL = METHOD_EMAIL
METHOD_GUEST = METHOD_GUEST
# Keys passed to child quests (nested call_api) so login state is cascaded
_INIT_QUEST_FOR_CHILD_KEYS = frozenset(
(
"_logged_in_user",
"_logged_in_method",
),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._logged_in_user = _cfg.logged_in_user
self._logged_in_method = METHOD_GUEST if _cfg.logged_in_user else None
def check_sim_type_role(self, sim_type, force_sim_type_required_for_api=False):
from sirepo import auth_role_moderation, oauth, uri_router
t = sirepo.template.assert_sim_type(sim_type)
self.qcall.sim_type_set(t)
if t not in sirepo.feature_config.auth_controlled_sim_types():
return
if (
not force_sim_type_required_for_api
and not uri_router.maybe_sim_type_required_for_api(self.qcall)
):
return
u = self.logged_in_user()
r = sirepo.auth_role.for_sim_type(t)
if self.qcall.auth_db.model("UserRole").has_active_role(role=r, uid=u):
return
if r in sirepo.auth_role.for_proprietary_oauth_sim_types():
oauth.raise_authorize_redirect(self.qcall, sirepo.auth_role.sim_type(r))
if r in sirepo.auth_role.for_moderated_sim_types():
auth_role_moderation.raise_control_for_user(self.qcall, u, r)
raise sirepo.util.Forbidden(f"uid={u} does not have access to sim_type={t}")
def _assert_role_user(self):
u = self.logged_in_user()
if not self.qcall.auth_db.model("UserRole").has_active_role(
role=sirepo.auth_role.ROLE_USER,
uid=u,
):
raise sirepo.util.Forbidden(
f"uid={u} role={sirepo.auth_role.ROLE_USER} not found"
)
return u
def complete_registration(self, name=None):
"""Update the database with the user's display_name and sets state to logged-in.
Guests will have no name.
"""
if self._qcall_bound_method() == METHOD_GUEST and name is not None:
raise AssertionError(
"user name={name} should be None with method={METHOD_GUEST}",
)
self.user_registration(self._qcall_bound_user(), display_name=name)
self.qcall.cookie.set_value(_COOKIE_STATE, _STATE_LOGGED_IN)
def cookie_cleaner(self, values):
"""Migrate from old cookie values
Always sets _COOKIE_STATE, which is our sentinel.
Args:
values (PKDict): just parsed values
Returns:
PKDict, bool: (unmodified or migrated values, True if modified)
"""
if values.get(_COOKIE_STATE):
# normal case: this module has seen a cookie at least once
# check for _cfg.methods changes; invalid methods cause a logout.
# No method is fine, because not logged in.
m = values.get(_COOKIE_METHOD)
if not m or m in valid_methods:
return values, False
# invalid method (changed config), reset state
pkdlog(
"possibly misconfigured server: invalid cookie_method={}, clearing auth values={}",
m,
values,
)
pkcollections.unchecked_del(
values, _COOKIE_METHOD, _COOKIE_USER, _COOKIE_STATE
)
return values, True
# data cleaning; do not need old auth values
if values.get("sru") or values.get("uid"):
pkdlog(
"unknown cookie values, clearing completely, not migrating: {}", values
)
return PKDict(), True
# normal case: new visitor (no user or state); set logged out
values[_COOKIE_STATE] = _STATE_LOGGED_OUT
return values, True
def create_user_from_email(self, email, display_name):
u = self._create_user(_METHOD_MODULES[METHOD_EMAIL], want_login=False)
self.user_registration(uid=u, display_name=display_name)
self.qcall.auth_db.model(
self.get_module(METHOD_EMAIL).UserModel,
unverified_email=email,
uid=u,
user_name=email,
).save()
return u
def get_module(self, name):
return _METHOD_MODULES[name]
def active_plan(self, uid):
u = self.logged_in_user()
if rv := self.qcall.auth_db.model("UserRole").unchecked_active_plan(uid=u):
return rv.role
raise AssertionError(f"user={u} has no active plans")
def guest_uids(self):
"""All of the uids corresponding to guest users."""
return self.qcall.auth_db.model("UserRegistration").search_all_for_column(
"uid", display_name=None
)
def is_logged_in(self, state=None):
"""Logged in is either needing to complete registration or done
Does not check simulation dir
Args:
state (str): logged in state [None: from cookie]
Returns:
bool: is in one of the logged in states
"""
s = state or self._qcall_bound_state()
return s in (_STATE_COMPLETE_REGISTRATION, _STATE_LOGGED_IN)
def is_premium_user(self):
return self.qcall.auth_db.model("UserRole").has_active_role(
role=(
sirepo.auth_role.ROLE_PLAN_PREMIUM,
sirepo.auth_role.ROLE_PLAN_ENTERPRISE,
),
uid=self.logged_in_user(),
)
def logged_in_user(self, check_path=True):
"""Get the logged in user
Args:
check_path (bool): call `simulation_db.user_path` [True]
Returns:
str: uid of authenticated user
"""
u = self._qcall_bound_user()
if not self.is_logged_in():
raise sirepo.util.SRException(
LOGIN_ROUTE_NAME,
None,
"user not logged in uid={}",
u,
)
assert u, "no user in cookie: state={} method={}".format(
self._qcall_bound_state(),
self._qcall_bound_method(),
)
if check_path:
simulation_db.user_path(uid=u, check=True)
return u
def logged_in_user_name(self):
"""Return user_name for logged in user"""
return self.user_name(
uid=self.logged_in_user(),
method=self._qcall_bound_method(),
)
def logged_in_user_name_local_part(self):
"""If user_name is email address, return the local part.
Otherwise, just return user_name.
"""
return self.logged_in_user_name().split("@")[0].lower()
@contextlib.contextmanager
def logged_in_user_set(self, uid, method=METHOD_GUEST):
"""Ephemeral login or may be used to logout"""
u = self._logged_in_user
m = self._logged_in_method
try:
self._logged_in_user = uid
self._logged_in_method = None if uid is None else method
yield
finally:
self._logged_in_user = u
self._logged_in_method = m
def login(
self,
method=None,
uid=None,
model=None,
sim_type=None,
display_name=None,
want_redirect=False,
moderation_reason=None,
):
"""Login the user
Raises an exception if successful, except in the case of methods
Args:
method (module or str): method module
uid (str): user to login [None]
model (auth_db.UserDbBase): user to login (overrides uid) [None]
sim_type (str): app to redirect to [None]
display_name (str): to save as the display_name [None]
want_redirect (bool): http redirect on success [False]
"""
from sirepo import auth_role_moderation
mm = _METHOD_MODULES[method] if isinstance(method, str) else method
self._validate_method(mm)
guest_uid = None
if model:
uid = model.uid
# if previously cookied as a guest, move the non-example simulations into uid below
m = self._qcall_bound_method()
if m == METHOD_GUEST and mm.AUTH_METHOD != METHOD_GUEST:
guest_uid = self._qcall_bound_user() if self.is_logged_in() else None
if uid:
self._login_user(mm, uid)
if mm.AUTH_METHOD in _cfg.deprecated_methods:
pkdlog("deprecated auth method={} uid={}".format(mm.AUTH_METHOD, uid))
if not uid:
# No user so clear cookie so this method is removed
self.reset_state()
# We are logged in with a deprecated method, and now the user
# needs to login with an allowed method.
self.login_fail_redirect(module=mm, reason="deprecated")
if not uid:
# No user in the cookie and method didn't provide one so
# the user might be switching methods (e.g. guest to email).
# Not allowed to go to guest from other methods, because there's
# no authentication for guest.
# Or, this is just a new user, and we'll create one.
uid = self._qcall_bound_user() if self.is_logged_in() else None
m = self._qcall_bound_method()
if uid and mm.AUTH_METHOD not in (m, METHOD_GUEST):
# switch this method to this uid (even for methods)
# except if the same method, then assuming logging in as different user.
# This handles the case where logging in as guest, creates a user every time
self._login_user(mm, uid)
else:
uid = self._create_user(mm, want_login=True)
if model:
model.uid = uid
model.save()
# see if the user has completed registration already (must be done before setting display_name)
nr = self.need_complete_registration(uid)
if display_name:
self.complete_registration(self.parse_display_name(display_name))
if nr and sirepo.feature_config.cfg().is_registration_moderated:
auth_role_moderation.save_moderation_reason(
self.qcall,
uid,
sim_type,
moderation_reason,
)
if sim_type:
if guest_uid and guest_uid != uid:
self.qcall.auth_db.commit()
simulation_db.migrate_guest_to_persistent_user(
guest_uid,
uid,
qcall=self.qcall,
)
self.login_success_response(sim_type, want_redirect)
assert not mm.AUTH_METHOD_VISIBLE
def login_fail_redirect(self, module=None, reason=None):
raise sirepo.util.SRException(
"loginFail",
PKDict(method=module.AUTH_METHOD, reason=reason),
"login failed: reason={} method={}",
reason,
module.AUTH_METHOD,
)
def login_success_response(self, sim_type, want_redirect=False):
r = None
if (
self.qcall.cookie.get_value(_COOKIE_STATE) == _STATE_COMPLETE_REGISTRATION
and self.qcall.cookie.get_value(_COOKIE_METHOD) == METHOD_GUEST
):
self.complete_registration()
if want_redirect:
r = (
"completeRegistration"
if (
self.qcall.cookie.get_value(_COOKIE_STATE)
== _STATE_COMPLETE_REGISTRATION
)
else None
)
raise sirepo.util.Redirect(sirepo.uri.local_route(sim_type, route_name=r))
raise sirepo.util.SReplyExc(
sreply=self.qcall.reply_ok(PKDict(authState=self._auth_state())),
)
def need_complete_registration(self, model_or_uid):
"""Does unauthenticated user need to complete registration?
If the current method is deprecated, then we will end up asking
the user for a name again, but that's ok.
Does not work for guest (which don't have their own models anyway).
Args:
model_or_uid (object): unauthenticated user record or uid
Returns:
bool: True if user will be redirected to needCompleteRegistration
"""
u = model_or_uid if isinstance(model_or_uid, str) else model_or_uid.uid
if not u:
return True
return not self.user_display_name(uid=u)
def only_for_api_auth_state(self):
try:
try:
return self._auth_state()
except sirepo.util.UserDirNotFound as e:
# Clear login and return new auth_state
self._handle_user_dir_not_found(**e.sr_args)
return self._auth_state()
except Exception as e:
pkdlog("exception={} stack={}", e, pkdexc())
# POSIT: minimal authState record, see _auth_state
return PKDict(
displayName=None,
guestIsOnlyMethod=not non_guest_methods,
isGuestUser=False,
isLoggedIn=False,
isModerated=sirepo.feature_config.cfg().is_registration_moderated,
roles=PKDict(),
userName=None,
uiWebSocket=sirepo.feature_config.cfg().ui_websocket,
visibleMethods=visible_methods,
)
def only_for_api_logout(self):
sirepo.events.emit(
self.qcall,
"auth_logout",
PKDict(uid=self._qcall_bound_user()),
)
self.qcall.cookie.set_value(_COOKIE_STATE, _STATE_LOGGED_OUT)
self._set_log_user()
def parse_display_name(self, value):
res = value.strip()
assert res, "invalid post data: displayName={}".format(value)
return res
def require_adm(self):
u = self.require_user()
if not self.qcall.auth_db.model("UserRole").has_active_role(
role=sirepo.auth_role.ROLE_ADM,
uid=u,
):
raise sirepo.util.Forbidden(
f"uid={u} role={sirepo.auth_role.ROLE_ADM} not found"
)
def require_auth_basic(self):
m = _METHOD_MODULES["basic"]
self._validate_method(m)
uid = m.require_user(self.qcall)
if not uid:
raise sirepo.util.WWWAuthenticate()
self.qcall.cookie.set_sentinel()
self._login_user(m, uid)
def require_email_user(self):
i = self.require_user()
m = self._qcall_bound_method()
if m != METHOD_EMAIL:
raise sirepo.util.Forbidden(f"method={m} is not email for uid={i}")
def require_plan(self):
u = self.require_user()
if not self.qcall.auth_db.model("UserRole").has_active_plan(uid=u):
raise sirepo.util.PlanExpired(f"uid={u} has no active plans")
def require_premium(self):
if not self.is_premium_user():
raise sirepo.util.Forbidden("not premium user")
def require_user(self):
"""Asserts whether user is logged in
Returns:
str: user id
"""
e = None
m = self._qcall_bound_method()
p = None
r = LOGIN_ROUTE_NAME
s = self._qcall_bound_state()
u = self._qcall_bound_user()
if u:
# Will raise an exception if dir not found
simulation_db.user_path(uid=u, check=True)
if s is None:
pass
elif s == _STATE_LOGGED_IN:
if m in _cfg.methods:
return self._assert_role_user()
if m in _cfg.deprecated_methods:
e = "deprecated"
else:
e = "invalid"
self.reset_state()
p = PKDict(reload_js=True)
e = f"auth_method={m} is {e}, forcing login: uid={u}"
elif s == _STATE_LOGGED_OUT:
e = "logged out uid={}".format(u)
if m in _cfg.deprecated_methods:
# Force login to this specific method so we can migrate to valid method
r = "loginWith"
p = PKDict({":method": m})
e = f"forced {r}={m} uid={u}"
elif s == _STATE_COMPLETE_REGISTRATION:
if m == METHOD_GUEST:
pkdc("guest completeRegistration={}", u)
self.complete_registration()
self.qcall.auth_db.commit()
return self._assert_role_user()
r = "completeRegistration"
e = "uid={} needs to complete registration".format(u)
else:
self.qcall.cookie.reset_state(f"state={s} uid={u} invalid, cannot continue")
p = PKDict(reload_js=True)
e = "invalid cookie state={} uid={}".format(s, u)
pkdc("SRException uid={} route={} params={} method={} error={}", u, r, p, m, e)
raise sirepo.util.SRException(
r, p, *(("user not logged in: {}", e) if e else ())
)
def reset_state(self):
self.qcall.cookie.unchecked_remove(_COOKIE_USER)
self.qcall.cookie.unchecked_remove(_COOKIE_METHOD)
self.qcall.cookie.set_value(_COOKIE_STATE, _STATE_LOGGED_OUT)
self._set_log_user()
def srunit_user(self):
"""Create a new guest user and log them in
**Only called from srunit**
"""
from pykern import pkunit
if not pkunit.is_test_run():
raise AssertionError("must be in pkunit test run")
return self._create_user(_METHOD_MODULES[METHOD_GUEST], want_login=True)
def unchecked_get_user(self, uid_or_user_name):
# support other user_name types
# POSIT: Uid's are from the base62 charset so an '@' implies an email.
if "@" in uid_or_user_name:
a = PKDict(user_name=uid_or_user_name)
m = self.qcall.auth_db.model(_METHOD_MODULES[METHOD_EMAIL].UserModel)
else:
a = PKDict(uid=simulation_db.assert_uid(uid_or_user_name))
m = self.qcall.auth_db.model("UserRegistration")
if u := m.unchecked_search_by(**a):
return u.uid
return None
def user_dir_not_found(self, user_dir, uid):
"""Called by sirepo.reply when user_dir is not found
Deletes any user records and resets auth state.
Args:
user_dir (str): directory not found
uid (str): user
"""
self._handle_user_dir_not_found(user_dir, uid)
return self.qcall.reply_redirect_for_app_root()
def user_display_name(self, uid):
return (
self.qcall.auth_db.model("UserRegistration").search_by(uid=uid).display_name
)
def user_if_logged_in(self, method):
"""Verify user is logged in and method matches
Args:
method (str): method must be logged in as
"""
if not self.is_logged_in():
return None
m = self._qcall_bound_method()
if m != method:
return None
return self._qcall_bound_user()
def user_name(self, uid, method=None):
"""Return user_name"""
m = method or self._qcall_bound_method()
t = getattr(_METHOD_MODULES[m], "UserModel", None)
if t:
return self.qcall.auth_db.model(t).search_by(uid=uid).user_name
elif m == METHOD_GUEST:
return f"{METHOD_GUEST}-{uid}"
raise AssertionError(f"user_name not found for uid={uid} with method={m}")
def user_registration(self, uid, display_name=None):
"""Get UserRegistration record or create one
Args:
uid (str): registrant
display_name (str): display_name of user
Returns:
auth.UserRegistration: record (potentially blank)
"""
res = self.qcall.auth_db.model("UserRegistration").unchecked_search_by(uid=uid)
if res:
if display_name is not None:
res.display_name = display_name
res.save()
else:
res = self.qcall.auth_db.model(
"UserRegistration",
created=pkcompat.utcnow(),
display_name=display_name,
uid=uid,
)
res.save()
return res
def _auth_state(self):
s = self._qcall_bound_state()
v = PKDict(
avatarUrl=None,
cookieName=_cookie_http_name,
displayName=None,
guestIsOnlyMethod=not non_guest_methods,
isGuestUser=False,
isLoggedIn=self.is_logged_in(s),
isModerated=sirepo.feature_config.cfg().is_registration_moderated,
jobRunModeMap=simulation_db.JOB_RUN_MODE_MAP,
max_message_bytes=sirepo.job.cfg().max_message_bytes,
method=self._qcall_bound_method(),
needCompleteRegistration=s == _STATE_COMPLETE_REGISTRATION,
parallelCoresMax=sirepo.mpi.cfg().cores,
roles=[],
userName=None,
uiWebSocket=sirepo.feature_config.cfg().ui_websocket,
visibleMethods=visible_methods,
)
if "sbatch" in v.jobRunModeMap:
v.sbatchQueueMaxes = sirepo.job.NERSC_QUEUE_MAX
if sirepo.feature_config.have_payments():
v.stripePublishableKey = sirepo.payments.cfg().stripe_publishable_key
u = self._qcall_bound_user()
if v.isLoggedIn:
if v.method == METHOD_GUEST:
v.displayName = _GUEST_USER_DISPLAY_NAME
v.isGuestUser = True
v.needCompleteRegistration = False
v.visibleMethods = non_guest_methods
else:
r = self.qcall.auth_db.model("UserRegistration").unchecked_search_by(
uid=u
)
if r:
v.displayName = r.display_name
v.roles = {
x.role: (x.expiration.timestamp() if x.expiration else None)
for x in self.qcall.auth_db.model("UserRole").get_roles_and_expiration(
u
)
}
self._plan(v)
self._method_auth_state(v, u)
if pkconfig.channel_in_internal_test():
# useful for testing/debugging
v.uid = u
pkdc("state={}", v)
return v
def _create_user(self, module, want_login):
u = simulation_db.user_create()
if want_login:
self._login_user(module, u)
self.qcall.auth_db.model("UserRole").add_roles(
roles=sirepo.auth_role.for_new_user(module.AUTH_METHOD),
uid=u,
)
return u
def _handle_user_dir_not_found(self, user_dir, uid):
for m in _METHOD_MODULES.values():
u = self._method_user_model(m, uid)
if u:
u.delete()
u = self.qcall.auth_db.model("UserRegistration").unchecked_search_by(uid=uid)
if u:
u.delete()
self.reset_state()
self.qcall.auth_db.commit()
pkdlog("user_dir={} uid={}", user_dir, uid)
def _login_user(self, module, uid):
"""Set up the cookie for logged in state
If a deprecated or non-visible method, just login. Otherwise, check the db
for registration.
Args:
module (module): what auth method
uid (str): which uid
"""
self.qcall.cookie.set_value(_COOKIE_USER, uid)
self.qcall.cookie.set_value(_COOKIE_METHOD, module.AUTH_METHOD)
s = _STATE_LOGGED_IN
if module.AUTH_METHOD_VISIBLE and module.AUTH_METHOD in _cfg.methods:
u = self.user_registration(uid)
if not u.display_name:
s = _STATE_COMPLETE_REGISTRATION
self.qcall.cookie.set_value(_COOKIE_STATE, s)
self._set_log_user()
def _method_auth_state(self, values, uid):
if values.method not in _METHOD_MODULES:
pkdlog(
'auth state method: "{}" not present in supported methods: {}',
values.method,
_METHOD_MODULES.keys(),
)
return
m = _METHOD_MODULES[values.method]
u = self._method_user_model(m, uid)
if not u:
return
values.userName = u.user_name
if hasattr(m, "avatar_uri"):
values.avatarUrl = m.avatar_uri(self.qcall, u, _AVATAR_SIZE)
def _method_user_model(self, module, uid):
if not hasattr(module, "UserModel"):
return None
return self.qcall.auth_db.model(module.UserModel).unchecked_search_by(uid=uid)
def _plan(self, data):
r = data.roles
if sirepo.auth_role.ROLE_PLAN_ENTERPRISE in r:
data.paymentPlan = _PAYMENT_PLAN_ENTERPRISE
data.upgradeToPlan = None
elif sirepo.auth_role.ROLE_PLAN_PREMIUM in r:
data.paymentPlan = _PAYMENT_PLAN_PREMIUM
data.upgradeToPlan = _PAYMENT_PLAN_ENTERPRISE
else:
data.paymentPlan = _PAYMENT_PLAN_BASIC
data.upgradeToPlan = _PAYMENT_PLAN_PREMIUM
def _qcall_bound_method(self):
return self._logged_in_method or self.qcall.cookie.unchecked_get_value(
_COOKIE_METHOD
)
def _qcall_bound_state(self):
if self._logged_in_user:
return _STATE_LOGGED_IN
return self.qcall.cookie.unchecked_get_value(_COOKIE_STATE)
def _qcall_bound_user(self):
return self._logged_in_user or self.qcall.cookie.unchecked_get_value(
_COOKIE_USER
)
def _set_log_user(self):
def _user():
u = self._qcall_bound_user()
if not u:
return ""
return self._qcall_bound_state() + "-" + u
self.qcall.sreq.set_log_user(_user())
def _validate_method(self, module):
if module.AUTH_METHOD in valid_methods:
return None
pkdlog("invalid auth method={}".format(module.AUTH_METHOD))
self.login_fail_redirect(module, "invalid-method")