Source code for sirepo.payments

"""Payment handling with Stripe

:copyright: Copyright (c) 2025 RadiaSoft LLC.  All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""

from pykern import pkconfig
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdlog, pkdp, pkdexc, pkdformat
import datetime
import sirepo.auth_role
import sirepo.cron
import sirepo.quest
import sirepo.srtime
import sirepo.util
import stripe

#: keep reference so auditor isn't garbage collected
_ROLE_AUDITOR_CRON = None
_STRIPE_ACTIVE_SUBSCRIPTION_STATUSES = {"trialing", "active", "past_due"}
_STRIPE_SIGNATURE_HEADER = "Stripe-Signature"
_STRIPE_SIREPO_UID_METADATA_KEY = "sirepo_uid"
_cfg = None


[docs] class API(sirepo.quest.API): @sirepo.quest.Spec("require_user") async def api_paymentCreateCheckoutSession(self): """Create a Stripe checkout session Returns: str: Stripe client secret """ def _return_url(): # Need to put braces in url for stripe but local_route escapes them. # POSIT: CHECKOUT_SESSION_ID is unique in our uri map. x = "CHECKOUT_SESSION_ID" return self.absolute_uri( sirepo.uri.local_route( self.parse_post().type, route_name="paymentFinalization", params=PKDict(session_id=x), ) ).replace(x, "{" + x + "}") u = self.auth.logged_in_user() p = self.body_as_dict().plan return self.reply_ok( PKDict( clientSecret=( await stripe.checkout.Session.create_async( customer_email=self.auth.user_name(u), ui_mode="embedded", line_items=[ PKDict( price=self._plan_to_price(p), quantity=1, ), ], mode="subscription", subscription_data=PKDict( metadata=PKDict({_STRIPE_SIREPO_UID_METADATA_KEY: u}), ), return_url=_return_url(), # TODO(e-carlin): needed? # automatic_tax=PKDict(enabled=True), ) ).client_secret, ), ) @sirepo.quest.Spec("require_user") async def api_paymentCheckoutSessionStatus(self): def _update_db(stripe_sub, session_id, uid, exp): pkdlog( "Stripe subscription for uid={} sessionId={} current_period_end={}", uid, session_id, exp, ) p = self._price_to_plan(stripe_sub["items"].data[0].price.id) self.auth_db.model("StripeSubscription").new( uid=uid, customer_id=stripe_sub.customer, checkout_session_id=session_id, subscription_id=stripe_sub.id, created=sirepo.srtime.utc_now(), revocation_reason=None, revoked=None, role=p, ).save() # We aren't 100% positive the user has fully paid at this # point. But, we are pretty sure. So, proactively give them the # role and _ROLE_AUDITOR_CRON will remove if it finds out they # didn't end up paying. if exp: # Renewals are calculated by Stripe so we trust them exp = datetime.datetime.fromtimestamp(exp) self.auth_db.model("UserRole").add_plan(p, uid, expiration=exp) def _uid(uid): u = self.auth.logged_in_user() if uid == u: return uid raise AssertionError(f"stripe_uid={uid} does not match logged_in_user={u}") i = self.body_as_dict().sessionId c = await stripe.checkout.Session.retrieve_async(i) if c.status == "complete": s = await stripe.Subscription.retrieve_async(c.subscription) _update_db( s, i, _uid(s.metadata[_STRIPE_SIREPO_UID_METADATA_KEY]), s.get("current_period_end"), ) return self.reply_ok(PKDict(sessionStatus=c.status)) @sirepo.quest.Spec("allow_visitor") async def api_paymentPlanRedirect(self, plan): raise sirepo.util.Redirect( sirepo.uri.local_route( sirepo.util.first_sim_type(), route_name="paymentCheckout", params=PKDict(plan=plan), ), ) @sirepo.quest.Spec("allow_visitor") async def api_stripeWebhook(self): # https://docs.stripe.com/billing/subscriptions/webhooks # TODO(e-carlin): need to add our api to stripe dashboard https://docs.stripe.com/webhooks#test-webhook e = stripe.Webhook.construct_event( self.sreq.body_as_bytes(), self.sreq.header_uget(_STRIPE_SIGNATURE_HEADER), cfg().stripe_webhook_secret, ) if e and e["type"] == "invoice.paid": s = await stripe.Subscription.retrieve_async( e["data"]["object"]["subscription"] ) if not len(s["items"].data) == 1: raise AssertionError( pkdformat("multiple subscription line items s={}", s) ) n = ( await stripe.Product.retrieve_async( s["items"].data[0]["price"]["product"] ) )["name"] # Be somewhat robust against stripe sending duplicate events. # The transaction will fail at commit time. # No await below here so we are sure only one call makes # it through. # https://docs.stripe.com/webhooks#handle-duplicate-events if self.auth_db.model("StripePayment").payment_exists( e["data"]["object"]["id"] ): return self.reply_ok() self.auth_db.model("StripePayment").new( amount_paid=e["data"]["object"]["amount_paid"], customer_id=e["data"]["object"]["customer"], invoice_id=e["data"]["object"]["id"], subscription_id=e["data"]["object"]["subscription"], subscription_name=n, uid=s.metadata[_STRIPE_SIREPO_UID_METADATA_KEY], ).save() return self.reply_ok() def _plan_to_price(self, plan): return { sirepo.auth_role.ROLE_PLAN_BASIC: cfg().stripe_plan_basic_price_id, sirepo.auth_role.ROLE_PLAN_PREMIUM: cfg().stripe_plan_premium_price_id, }[plan] def _price_to_plan(self, price): return { cfg().stripe_plan_basic_price_id: sirepo.auth_role.ROLE_PLAN_BASIC, cfg().stripe_plan_premium_price_id: sirepo.auth_role.ROLE_PLAN_PREMIUM, }[price]
[docs] def init_apis(*args, **kwargs): global _ROLE_AUDITOR_CRON _ROLE_AUDITOR_CRON = sirepo.cron.CronTask( cfg().role_auditor_cron_period, _auditor, None )
[docs] def cfg(): global _cfg if _cfg: return _cfg _cfg = pkconfig.init( role_auditor_cron_period=( 60 * 60 * 24, int, "Seconds to sleep between runs of auditor", ), stripe_secret_key=pkconfig.Required(str, "Stripe secret API key"), stripe_publishable_key=pkconfig.Required(str, "Stripe publishable API key"), stripe_plan_basic_price_id=pkconfig.Required( str, "Stripe price ID for basic plan" ), stripe_plan_premium_price_id=pkconfig.Required( str, "Stripe price ID for premium plan" ), # In dev stripe cli will output this stripe_webhook_secret=pkconfig.Required( str, "Stripe secret key for webhook security" ), ) stripe.api_key = _cfg.stripe_secret_key # Explicitly setting like this forces stripe to raise an error if # a sync call is ever called when there is a corresponding async # method available. stripe.default_http_client = stripe.HTTPXClient() return _cfg
async def _auditor(_): """Remove sirepo subscription/role for users with inactive Stripe subscription status. We proactively assign roles from api_paymentCheckoutSessionStatus. This auditor goes through all sirepo subscriptions/roles created by that API and revokes any that are not active in Stripe. """ async def _stripe_status_is_active(subscription_record): s = await stripe.Subscription.retrieve_async( subscription_record.subscription_id ) if s.metadata[_STRIPE_SIREPO_UID_METADATA_KEY] != subscription_record.uid: raise AssertionError( pkdformat( "subscription={} not bound to uid={}", s, subscription_record.uid, ) ) return s.status in _STRIPE_ACTIVE_SUBSCRIPTION_STATUSES with sirepo.quest.start() as qcall: for s in qcall.auth_db.model( "StripeSubscription" ).not_revoked_stripe_subscriptions(): if qcall.auth_db.model("UserRole").has_expired_role(s.role, uid=s.uid): continue if await _stripe_status_is_active( s, ): continue pkdlog("revoking uid={} role={}", s.uid, s.role) qcall.auth_db.model("UserRole").expire_role(s.role, uid=s.uid) qcall.auth_db.model( "StripeSubscription" ).revoke_due_to_inactive_stripe_status(s)