Source code for checkout.models

# -*- encoding: utf-8 -*-
import logging
import stripe

from datetime import date
from dateutil.relativedelta import relativedelta
from dateutil.rrule import MONTHLY, rrule
from decimal import Decimal
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models, transaction
from django.urls import reverse
from django.utils import timezone
from reversion import revisions as reversion

from base.model_utils import TimeStampedModel
from base.singleton import SingletonModel
from base.url_utils import url_with_querystring
from mail.models import Mail, Notify
from mail.service import queue_mail_message, queue_mail_template
from mail.tasks import process_mail


CURRENCY = "GBP"
logger = logging.getLogger(__name__)


def _card_error(e):
    return (
        "CardError: param '{}' code '{}' http body '{}' "
        "http status '{}'".format(e.param, e.code, e.http_body, e.http_status)
    )


def _stripe_error(e):
    return "http body: '{}' http status: '{}'".format(
        e.http_body, e.http_status
    )


[docs]def as_pennies(total): return int(total * Decimal("100"))
[docs]def default_checkout_state(): return CheckoutState.objects.get(slug=CheckoutState.PENDING).pk
[docs]def expiry_date_as_str(item): d = item.get("expiry_date", None) return d.strftime("%Y%m%d") if d else ""
[docs]class CheckoutError(Exception): def __init__(self, value): Exception.__init__(self) self.value = value def __str__(self): return repr("%s, %s" % (self.__class__.__name__, self.value))
[docs]class CheckoutStateManager(models.Manager): @property def fail(self): return self.model.objects.get(slug=self.model.FAIL) @property def pending(self): return self.model.objects.get(slug=self.model.PENDING) @property def request(self): """The 'request' state is used for payment plans only. It allows the system to set the state to ``request`` before charging the account. """ return self.model.objects.get(slug=self.model.REQUEST) @property def success(self): return self.model.objects.get(slug=self.model.SUCCESS)
[docs]class CheckoutState(TimeStampedModel): FAIL = "fail" PENDING = "pending" # The 'request' state is used for payment plans only. REQUEST = "request" SUCCESS = "success" name = models.CharField(max_length=100) slug = models.SlugField(unique=True) objects = CheckoutStateManager() class Meta: ordering = ("name",) verbose_name = "Checkout state" verbose_name_plural = "Checkout states" def __str__(self): return "{}".format(self.name) @property def is_success(self): return self.slug == self.SUCCESS @property def is_pending(self): return self.slug == self.PENDING
reversion.register(CheckoutState)
[docs]class CheckoutActionManager(models.Manager): @property def card_refresh(self): return self.model.objects.get(slug=self.model.CARD_REFRESH) @property def charge(self): return self.model.objects.get(slug=self.model.CHARGE) @property def invoice(self): return self.model.objects.get(slug=self.model.INVOICE) @property def manual(self): return self.model.objects.get(slug=self.model.MANUAL) @property def payment(self): return self.model.objects.get(slug=self.model.PAYMENT) @property def payment_plan(self): return self.model.objects.get(slug=self.model.PAYMENT_PLAN)
[docs]class CheckoutAction(TimeStampedModel): CARD_REFRESH = "card_refresh" CHARGE = "charge" INVOICE = "invoice" MANUAL = "manual" PAYMENT = "payment" PAYMENT_PLAN = "payment_plan" name = models.CharField(max_length=100) slug = models.SlugField(unique=True) payment = models.BooleanField() objects = CheckoutActionManager() class Meta: ordering = ("name",) verbose_name = "Checkout action" verbose_name_plural = "Checkout action" def __str__(self): return "{}".format(self.name) @property def invoice(self): return self.slug == self.INVOICE @property def payment_plan(self): return self.slug == self.PAYMENT_PLAN
reversion.register(CheckoutAction)
[docs]class CustomerManager(models.Manager): def _create_customer(self, name, email, customer_id): obj = self.model(name=name, email=email, customer_id=customer_id) obj.save() return obj def _stripe_create(self, email, description, token): """Use the Stripe API to create a customer.""" try: stripe.api_key = settings.STRIPE_SECRET_KEY customer = stripe.Customer.create( email=email, description=description, card=token ) return customer.id except ( stripe.error.InvalidRequestError, stripe.error.StripeError, ) as e: raise CheckoutError( "Error creating Stripe customer '{}': {}".format( email, _stripe_error(e) ) ) from e def _stripe_get_card_expiry(self, customer_id): result = (0, 0) stripe.api_key = settings.STRIPE_SECRET_KEY customer = stripe.Customer.retrieve(customer_id) default_card = customer["default_source"] # find the details of the default card for card in customer["sources"]["data"]: if card["id"] == default_card: # find the expiry date of the default card result = (int(card["exp_year"]), int(card["exp_month"])) break return result def _stripe_update(self, customer_id, description, token): """Use the Stripe API to update a customer.""" try: stripe.api_key = settings.STRIPE_SECRET_KEY stripe_customer = stripe.Customer.retrieve(customer_id) stripe_customer.description = description stripe_customer.card = token stripe_customer.save() except stripe.error.StripeError as e: raise CheckoutError( "Error updating Stripe customer '{}': {}".format( customer_id, _stripe_error(e) ) ) from e
[docs] def get_customer(self, email): """Get the Stripe customer from the email address. Note: email addresses are case-sensitive. For more info, see https://code.djangoproject.com/ticket/17561. """ result = self.get_customers(email).first() if not result: raise self.model.DoesNotExist( "Cannot find a customer record for '{}'".format(email) ) return result
[docs] def get_customers(self, email): return self.model.objects.filter(email__iexact=email).order_by("-pk")
[docs] def init_customer(self, content_object, token): """Initialise Stripe customer using email, description and token. 1. Lookup existing customer record in the database. - Retrieve customer from Stripe and update description and token. 2. If the customer does not exist: - Create Stripe customer with email, description and token. - Create a customer record in the database. Return the customer database record and clear the card ``refresh`` flag. """ name = content_object.checkout_name email = content_object.checkout_email try: obj = self.get_customer(email) obj.name = name obj.refresh = False obj.save() self._stripe_update(obj.customer_id, name, token) except self.model.DoesNotExist: customer_id = self._stripe_create(email, name, token) obj = self._create_customer(name, email, customer_id) return obj
@property def refresh(self): """Customers with expiring cards.""" return self.model.objects.filter(refresh=True)
[docs] def refresh_credit_card(self, email): result = False try: customer = self.get_customer(email) result = customer.refresh except self.model.DoesNotExist: pass return result
[docs] def update_card_expiry(self, email): """Find the customer, get the expiry date from Stripe and update.""" try: obj = self.get_customer(email) year, month = self._stripe_get_card_expiry(obj.customer_id) if year and month: # last day of the month obj.expiry_date = date(year, month, 1) + relativedelta( months=+1, day=1, days=-1 ) # is the card expiring soon? is_expiring = obj.is_expiring if obj.refresh == is_expiring: pass else: with transaction.atomic(): obj.refresh = is_expiring # save the details obj.save() # email the customer queue_mail_template( obj, self.model.MAIL_TEMPLATE_CARD_EXPIRY, {obj.email: dict(name=obj.name)}, ) except Customer.DoesNotExist: pass
[docs]class Customer(TimeStampedModel): """Stripe Customer. Link the Stripe customer to an email address (and name). Note: It is expected that multiple users in our databases could have the same email address. If they have different names, then this table looks very confusing. Try checking the 'content_object' of the 'Checkout' model if you need to diagnose an issue. """ MAIL_TEMPLATE_CARD_EXPIRY = "customer_card_expiry" MAIL_TEMPLATE_CARD_REFRESH_REQUEST = "contact_card_refresh_request" MAIL_TEMPLATE_REMINDER = "customer_instalment_reminder" name = models.TextField() email = models.EmailField(unique=True) customer_id = models.TextField() expiry_date = models.DateField(blank=True, null=True) refresh = models.BooleanField( default=False, help_text="Should the customer refresh their card details?", ) objects = CustomerManager() class Meta: ordering = ("pk",) verbose_name = "Customer" verbose_name_plural = "Customers" def __str__(self): return "{} {}".format(self.email, self.customer_id)
[docs] def get_absolute_url(self): return url_with_querystring( reverse("checkout.customer"), email=self.email )
@property def checkout_actions(self): return [CheckoutAction.CARD_REFRESH] @property def checkout_can_charge(self): """We can always take a payment for this object!""" return True @property def checkout_description(self): result = "{}".format(self.name) return [result] @property def checkout_email(self): return self.email
[docs] def checkout_fail(self, checkout): pass
[docs] def checkout_fail_url(self, checkout_pk): return self.get_absolute_url()
@property def checkout_name(self): return "{}".format(self.name)
[docs] def checkout_success(self, checkout): pass
[docs] def checkout_success_url(self, checkout_pk): return self.get_absolute_url()
@property def checkout_total(self): return Decimal() @property def is_expiring(self): """Is the card expiring within the next month? If the ``expiry_date`` is ``None``, then it has *not* expired. The expiry date is set to the last day of the month e.g. for September 2015, the ``expiry_date`` will be 30/09/2015. """ result = False one_month = date.today() + relativedelta(months=+1) if self.expiry_date and self.expiry_date <= one_month: return True return result
reversion.register(Customer)
[docs]class CustomerPayment(TimeStampedModel): """Take a payment from a customer.""" user = models.ForeignKey( settings.AUTH_USER_MODEL, related_name="+", on_delete=models.CASCADE ) customer = models.ForeignKey(Customer, on_delete=models.CASCADE) description = models.CharField(max_length=100) total = models.DecimalField(max_digits=8, decimal_places=2) class Meta: ordering = ("pk",) verbose_name = "Customer payment" verbose_name_plural = "Customer payments" def __str__(self): return "{} {} {}".format( self.customer.email, self.description, self.total )
[docs] def get_absolute_url(self): return url_with_querystring( reverse("checkout.customer"), email=self.customer.email )
@property def checkout_actions(self): return [CheckoutAction.PAYMENT] @property def checkout_can_charge(self): return bool(self.total) @property def checkout_description(self): result = "{}".format(self.description) return [result] @property def checkout_email(self): return self.customer.email
[docs] def checkout_fail(self, checkout): pass
[docs] def checkout_fail_url(self, checkout_pk): return self.get_absolute_url()
@property def checkout_name(self): return "{}".format(self.customer.name)
[docs] def checkout_success(self, checkout): pass
[docs] def checkout_success_url(self, checkout_pk): return self.get_absolute_url()
@property def checkout_total(self): return self.total
reversion.register(CustomerPayment)
[docs]class CheckoutManager(models.Manager): def _notify_user(self, checkout, current_user): """Notify the user (unless they are a member of staff or anonymous).""" if current_user.is_staff or current_user.is_anonymous: pass else: checkout.notify()
[docs] def audit(self, content_type=None): return self.model.objects.all().order_by("-pk")
[docs] def audit_content_object(self, content_object): content_type = ContentType.objects.get_for_model(content_object) return self.model.objects.filter( content_type=content_type, object_id=content_object.pk ).order_by("-pk")
[docs] def audit_content_type(self, content_type): return self.model.objects.filter(content_type=content_type).order_by( "-pk" )
[docs] def audit_customers(self, customers): return self.model.objects.filter(customer__in=customers).order_by("-pk")
[docs] def create_checkout(self, action, content_object, user): """Create a checkout payment request.""" if action == CheckoutAction.objects.card_refresh: total = None else: total = content_object.checkout_total obj = self.model( checkout_date=timezone.now(), action=action, content_object=content_object, description=", ".join(content_object.checkout_description), total=total, ) # an anonymous user can create a checkout if user.is_authenticated: obj.user = user obj.save() return obj
[docs] def charge(self, content_object, current_user, payment_run_item=None): """Collect some money from a customer. You must be a member of staff to use this method. For payment plans, a background process will charge the card. In this case, the user will be anonymous. We should only attempt to collect money if the customer has already entered their card details. """ content_object.refresh_from_db() if not content_object.checkout_can_charge: raise CheckoutError("Cannot charge the card.") try: customer = Customer.objects.get_customer( content_object.checkout_email ) except Customer.DoesNotExist as e: raise CheckoutError( "Customer '{}' has not registered a card".format( content_object.checkout_email ) ) from e with transaction.atomic(): checkout = self.create_checkout( CheckoutAction.objects.charge, content_object, current_user ) checkout.customer = customer checkout.save() if payment_run_item: payment_run_item.checkout = checkout payment_run_item.save() try: checkout.charge(current_user) with transaction.atomic(): checkout.success() self._notify_user(checkout, current_user) except CheckoutError as e: logger.error(e) with transaction.atomic(): checkout.fail() self._notify_user(checkout, current_user) raise
[docs] def manual(self, content_object, current_user): """Mark a transaction as paid (manual). You must be a member of staff to use this method. """ content_object.refresh_from_db() if not current_user.is_staff: raise CheckoutError( "Only a member of staff can mark this transaction as paid." ) valid_state = ( CheckoutState.FAIL, CheckoutState.PENDING, CheckoutState.REQUEST, ) if content_object.state.slug not in valid_state: raise CheckoutError("Cannot mark this transaction as paid.") checkout = self.create_checkout( CheckoutAction.objects.manual, content_object, current_user ) checkout.save() try: with transaction.atomic(): checkout.success() except CheckoutError: with transaction.atomic(): checkout.fail()
[docs] def success(self): return self.audit().filter(state=CheckoutState.objects.success)
[docs] def for_content_object(self, obj): return self.model.objects.get( content_type=ContentType.objects.get_for_model(obj), object_id=obj.pk, )
[docs]class Checkout(TimeStampedModel): """Checkout. Create a 'Checkout' instance when you want to interact with Stripe e.g. take a payment, get card details to set-up a payment plan or refresh the details of an expired card. """ checkout_date = models.DateTimeField() action = models.ForeignKey(CheckoutAction, on_delete=models.CASCADE) customer = models.ForeignKey( Customer, blank=True, null=True, on_delete=models.CASCADE ) user = models.ForeignKey( settings.AUTH_USER_MODEL, related_name="+", blank=True, null=True, on_delete=models.CASCADE, help_text=( "User who created the checkout request " "(or blank if the the user is not logged in)" ), ) state = models.ForeignKey( CheckoutState, default=default_checkout_state, on_delete=models.CASCADE ) description = models.TextField() total = models.DecimalField( max_digits=8, decimal_places=2, blank=True, null=True ) # link to the object in the system which requested the checkout content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() content_object = GenericForeignKey() objects = CheckoutManager() class Meta: ordering = ("pk",) verbose_name = "Checkout" verbose_name_plural = "Checkouts" def __str__(self): return "{}".format(self.content_object.checkout_email) def _charge(self): """Charge the card.""" if self.action.payment: self._charge_stripe() def _charge_stripe(self): """Create the charge on stripe's servers.""" stripe.api_key = settings.STRIPE_SECRET_KEY try: stripe.Charge.create( amount=as_pennies(self.total), currency=CURRENCY, customer=self.customer.customer_id, description=self.description, ) except stripe.error.CardError as e: raise CheckoutError( "Card error: '{}' checkout '{}', object '{}': {}".format( e.code, self.pk, self.content_object.pk, _card_error(e) ) ) from e except stripe.error.StripeError as e: raise CheckoutError( "Card error: checkout '{}', object '{}': {}".format( self.pk, self.content_object.pk, _stripe_error(e) ) ) from e def _success_or_fail(self, state): self.state = state self.save()
[docs] def charge(self, current_user): """Charge the user's card. Must be a member of staff or anonymous (used when running as a background task) to use this method. To take payments for the current user, use the ``charge_user`` method. """ if current_user.is_staff or current_user.is_anonymous: self._charge() else: raise CheckoutError( "Cannot process - payments can only " "be taken by a member of staff. " "Current: '{}', Customer: '{}'".format( current_user.email, self.customer.email ) )
[docs] def charge_user(self, current_user): """Charge the card of the current user. Use this method when the logged in user is performing the transaction. To take money from another user's card, you must be a member of staff and use the ``charge`` method. """ anonymous = not current_user.is_authenticated staff = current_user.is_staff if anonymous or staff or self.customer.email == current_user.email: self._charge() else: raise CheckoutError( "Cannot process - payments can only be taken " "for an anonymous user or the current user. " "Current: '{}', Customer: '{}'".format( current_user.email, self.customer.email ) )
@property def content_object_url(self): return self.content_object.get_absolute_url()
[docs] def fail(self): """Checkout failed - so update and notify admin.""" self._success_or_fail(CheckoutState.objects.fail) return self.content_object.checkout_fail(self)
@property def failed(self): """Did the checkout request fail?""" return self.state == CheckoutState.objects.fail @property def previous_address_data(self): try: if self.action.slug == "payment_plan": data = self.checkoutadditional return filter( None, ( data.address_1, data.address_2, data.address_3, data.town, data.county, data.postcode, data.country, ), ) except CheckoutAdditional.DoesNotExist: pass return [] @property def date_of_birth(self): try: data = self.checkoutadditional return data.date_of_birth except CheckoutAdditional.DoesNotExist: pass return None @property def invoice_data(self): try: if self.action.slug == "invoice": data = self.checkoutadditional return filter( None, ( data.contact_name, data.company_name, data.address_1, data.address_2, data.address_3, data.town, data.county, data.postcode, data.country, data.email, data.phone, ), ) except CheckoutAdditional.DoesNotExist: pass return [] @property def is_invoice(self): return self.action == CheckoutAction.objects.invoice @property def is_manual(self): return self.action == CheckoutAction.objects.manual @property def is_payment(self): return self.action == CheckoutAction.objects.payment @property def is_payment_plan(self): """Used in success templates.""" return self.action == CheckoutAction.objects.payment_plan
[docs] def notify(self, request=None): """Send notification of checkout status. Pass in a 'request' if you want the email to contain the URL of the checkout transaction. """ email_addresses = [n.email for n in Notify.objects.all()] if email_addresses: caption = self.action.name state = self.state.name subject = "{} - {} from {}".format( state.upper(), caption.capitalize(), self.content_object.checkout_name, ) message = "{} - {} - {} from {}, {}:".format( self.created.strftime("%d/%m/%Y %H:%M"), state.upper(), caption, self.content_object.checkout_name, self.content_object.checkout_email, ) message = message + "\n\n{}\n\n{}".format( self.description, request.build_absolute_uri(self.content_object_url) if request else "", ) if self.invoice_data: message = message + "\n\nInvoice: {}".format( ", ".join(self.invoice_data) ) if self.previous_address_data: message = message + "\n\nPrevious address: {}".format( ", ".join(self.previous_address_data) ) if self.date_of_birth: dob = self.date_of_birth.strftime("%d/%m/%Y") message = message + "\n\nDate of birth: {}".format(dob) queue_mail_message(self, email_addresses, subject, message) else: logger.error( "Cannot send email notification of checkout transaction. " "No email addresses set-up in 'enquiry.models.Notify'" )
[docs] def success(self): """Checkout successful. This method is called from within a transaction. See ``checkout.success`` in the model manager. """ self._success_or_fail(CheckoutState.objects.success) return self.content_object.checkout_success(self)
reversion.register(Checkout)
[docs]class CheckoutAdditionalManager(models.Manager):
[docs] def create_checkout_additional(self, checkout, **kwargs): obj = self.model(checkout=checkout, **kwargs) obj.save() return obj
[docs]class CheckoutAdditional(TimeStampedModel): """If a user decides to pay by invoice, there are the details. Links with the 'CheckoutForm' in ``checkout/forms.py``. Probably easier to put validation in the form if required. """ checkout = models.OneToOneField(Checkout, on_delete=models.CASCADE) # company company_name = models.CharField(max_length=100, blank=True) address_1 = models.CharField("Address", max_length=100, blank=True) address_2 = models.CharField("", max_length=100, blank=True) address_3 = models.CharField("", max_length=100, blank=True) town = models.CharField(max_length=100, blank=True) county = models.CharField(max_length=100, blank=True) postcode = models.CharField(max_length=20, blank=True) country = models.CharField(max_length=100, blank=True) # contact contact_name = models.CharField(max_length=100, blank=True) email = models.EmailField(blank=False) phone = models.CharField(max_length=50, blank=True) date_of_birth = models.DateField(blank=True, null=True) objects = CheckoutAdditionalManager() class Meta: ordering = ("email",) verbose_name = "Checkout Additional Information" verbose_name_plural = "Checkout Additional Information" def __str__(self): return "{}".format(self.email)
reversion.register(CheckoutAdditional)
[docs]class PaymentPlanManager(models.Manager):
[docs] def create_payment_plan(self, slug, name, deposit, count, interval): obj = self.model( slug=slug, name=name, deposit=deposit, count=count, interval=interval, ) obj.save() return obj
[docs] def current(self): """List of payment plan headers excluding 'deleted'.""" return self.model.objects.exclude(deleted=True)
[docs]class PaymentPlan(TimeStampedModel): """Definition of a payment plan.""" name = models.TextField() slug = models.SlugField(unique=True) deposit = models.IntegerField(help_text="Initial deposit as a percentage") count = models.IntegerField(help_text="Number of instalments") interval = models.IntegerField(help_text="Instalment interval in months") deleted = models.BooleanField(default=False) objects = PaymentPlanManager() class Meta: ordering = ("slug",) verbose_name = "Payment plan" verbose_name_plural = "Payment plan" def __str__(self): return "{}".format(self.slug)
[docs] def clean(self): if not self.count: raise ValidationError("Set at least one instalment.") if not self.deposit: raise ValidationError("Set an initial deposit.") if not self.interval: raise ValidationError( "Set the number of months between instalments." )
[docs] def save(self, *args, **kwargs): if self.can_update: super().save(*args, **kwargs) else: raise CheckoutError("Payment plan in use. Cannot be updated.")
@property def can_update(self): count = ObjectPaymentPlan.objects.filter(payment_plan=self).count() if count: return False else: return True
[docs] def deposit_amount(self, total): return (total * (self.deposit / Decimal("100"))).quantize( Decimal(".01") )
[docs] def instalments(self, deposit_date, total): """Calculate the instalment dates and values.""" # deposit deposit = self.deposit_amount(total) # list of dates first_interval = self.interval if deposit_date.day > 15: first_interval = first_interval + 1 start_date = deposit_date + relativedelta(months=+first_interval, day=1) instalment_dates = [ d.date() for d in rrule( MONTHLY, count=self.count, dtstart=start_date, interval=self.interval, ) ] # instalments instalment = ((total - deposit) / self.count).quantize(Decimal(".01")) # list of payment amounts values = [] check = deposit for d in instalment_dates: value = instalment values.append(value) check = check + value # make the total match values[-1] = values[-1] + (total - check) return list(zip(instalment_dates, values))
[docs] def example(self, deposit_date, total): result = [(deposit_date, self.deposit_amount(total))] return result + self.instalments(deposit_date, total)
reversion.register(PaymentPlan)
[docs]class ObjectPaymentPlanManager(models.Manager): def _payment_plans_by_instalment_state(self, states): values = ObjectPaymentPlanInstalment.objects.filter( state__slug__in=states ).values_list("object_payment_plan__pk", flat=True) # 'set' will remove duplicate 'values' return self.model.objects.filter(pk__in=(set(values))).exclude( deleted=True )
[docs] def charge_deposit(self, content_object, user): payment_plan = self.for_content_object(content_object) payment_plan.charge_deposit(user)
[docs] def create_object_payment_plan(self, content_object, payment_plan, total): """Create a payment plan for an object with the initial deposit record. This method must be called from within a transaction. """ obj = self.model( content_object=content_object, payment_plan=payment_plan, total=total, ) obj.save() ObjectPaymentPlanInstalment.objects.create_object_payment_plan_instalment( obj, 1, True, payment_plan.deposit_amount(total), date.today() ) return obj
@property def fail_or_request(self): return self._payment_plans_by_instalment_state( (CheckoutState.FAIL, CheckoutState.REQUEST) )
[docs] def for_content_object(self, obj): """Return the ``ObjectPaymentPlan`` instance for ``obj``.""" return self.model.objects.get( content_type=ContentType.objects.get_for_model(obj), object_id=obj.pk, )
@property def outstanding_payment_plans(self): """List of outstanding payment plans. Used to refresh card expiry dates. """ return self._payment_plans_by_instalment_state( (CheckoutState.FAIL, CheckoutState.PENDING, CheckoutState.REQUEST) ) @property def report_card_expiry_dates(self): """Outstanding payment plans showing expiry dates in order.""" emails = [] result = [] payment_plans = self.outstanding_payment_plans for item in payment_plans: emails.append(item.content_object.checkout_email) # get the expiry date for all the customers (as a 'dict') customers = dict( Customer.objects.filter(email__in=emails).values_list( "email", "expiry_date" ) ) for item in payment_plans: expiry_date = customers.get(item.content_object.checkout_email) result.append( dict(expiry_date=expiry_date, object_payment_plan=item) ) return sorted(result, key=expiry_date_as_str)
[docs] def refresh_card_expiry_dates(self): """Refresh the card expiry dates for outstanding payment plans.""" for plan in self.outstanding_payment_plans: Customer.objects.update_card_expiry( plan.content_object.checkout_email )
[docs]class ObjectPaymentPlan(TimeStampedModel): """Payment plan for an object.""" # link to the object in the system which requested the payment plan content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() content_object = GenericForeignKey() # payment plan payment_plan = models.ForeignKey(PaymentPlan, on_delete=models.CASCADE) total = models.DecimalField(max_digits=8, decimal_places=2) # is this object deleted? deleted = models.BooleanField(default=False) objects = ObjectPaymentPlanManager() class Meta: ordering = ("created",) unique_together = ("object_id", "content_type") verbose_name = "Object payment plan" verbose_name_plural = "Object payment plans" def __str__(self): return "{} created {}".format(self.payment_plan.name, self.created) def _check_create_instalments(self): """Check the current records to make sure we can create instalments.""" instalments = self.objectpaymentplaninstalment_set.all() count = instalments.count() if not count: # a payment plan should always have a deposit record raise CheckoutError( "no deposit/instalment record set-up for " "payment plan: '{}'".format(self.pk) ) if count == 1: # check the first payment is the deposit first_instalment = instalments.first() if not first_instalment.deposit: raise CheckoutError( "no deposit record for " "payment plan: '{}'".format(self.pk) ) else: # cannot create instalments if already created! raise CheckoutError( "instalments already created for this " "payment plan: '{}'".format(self.pk) )
[docs] def create_instalments(self): """The deposit has been paid, so create the instalments.""" self._check_create_instalments() instalments = self.payment_plan.instalments( timezone.now().date(), self.total ) count = 1 for due, amount in instalments: count = count + 1 ObjectPaymentPlanInstalment.objects.create_object_payment_plan_instalment( self, count, False, amount, due )
[docs] def charge_deposit(self, user): self._check_create_instalments() deposit = self.objectpaymentplaninstalment_set.first() Checkout.objects.charge(deposit, user)
[docs] def delete(self): self.deleted = True self.save()
@property def instalment_count(self): return self.objectpaymentplaninstalment_set.exclude( deposit=True ).count()
[docs] def instalments_due(self, due_date=None): """Check if previous instalments are due. .. note:: The ``due_date`` parameter will exclude instalments due *before* this date. """ if due_date is None: due_date = date.today() qs = self.objectpaymentplaninstalment_set.filter( due__lt=due_date ).exclude(state__slug=CheckoutState.SUCCESS) return qs
@property def payment_count(self): return self.objectpaymentplaninstalment_set.count() @property def payments(self): return self.objectpaymentplaninstalment_set.all().order_by("count")
reversion.register(ObjectPaymentPlan)
[docs]class ObjectPaymentPlanInstalmentManager(models.Manager): # @property # def checkout_list(self): # """All 'checkout' transactions for object payment plan instalments. # Use to audit checkout failures so we can block access. # """ # return Checkout.objects.filter( # content_type=ContentType.objects.get_for_model(self.model), # ) def _due(self, date_gt, date_lt): return ( self.model.objects.filter( due__gte=date_gt, due__lte=date_lt, state__slug__in=(CheckoutState.FAIL, CheckoutState.PENDING), ) .exclude(deposit=True) .exclude(retry_count__gt=self.model.RETRY_COUNT) .exclude(object_payment_plan__deleted=True) )
[docs] def create_object_payment_plan_instalment( self, object_payment_plan, count, deposit, amount, due ): obj = self.model( object_payment_plan=object_payment_plan, count=count, deposit=deposit, amount=amount, due=due, ) obj.save() return obj
@property def due(self): """Lock the records while we try and take the payment. TODO Do we need to check that a payment is not already linked to this record? """ date_lt = date.today() date_gt = date_lt + relativedelta( days=-ObjectPaymentPlanInstalment.RETRY_COUNT ) return self._due(date_gt, date_lt)
[docs] def reminder(self): """List of instalments which will be due soon. This information will be used to send a reminder email to the customer so they can make sure they have money in their account. """ date_gt = date.today() date_lt = date_gt + relativedelta( days=+ObjectPaymentPlanInstalment.REMINDER_DAYS ) return self._due(date_gt, date_lt)
[docs] def reminder_emails(self): """ 1. Which reminder emails are due. 2. Remove email addresses which have already been sent a reminder. """ result = {} for instalment in self.reminder().order_by("-due"): result[instalment.checkout_email] = instalment.pk # check to see if we already sent a reminder for this email address date_lt = timezone.now() date_gt = date_lt + relativedelta( days=-ObjectPaymentPlanInstalment.REMINDER_CHECK ) previous_reminders = Mail.objects.emails_for_template( date_gt, date_lt, Customer.MAIL_TEMPLATE_REMINDER ) # remove email addresses which have already been sent a reminder for email in previous_reminders: result.pop(email, None) return result
[docs] def send_payment_reminder_emails(self): count = 0 for email, instalment_pk in self.reminder_emails().items(): instalment = self.model.objects.get(pk=instalment_pk) context = { email: { "due": instalment.due.strftime("%a %d %b %Y"), "name": instalment.checkout_name, } } queue_mail_template( instalment, Customer.MAIL_TEMPLATE_REMINDER, context ) count = count + 1 process_mail.delay() return count
[docs]class ObjectPaymentPlanInstalment(TimeStampedModel): """Payments due for an object. The deposit record gets created first. It has the ``deposit`` field set to ``True``. The instalment records are created after the deposit has been collected. Instalment records have the ``deposit`` field set to ``False``. """ REMINDER_CHECK = 21 REMINDER_DAYS = 7 RETRY_COUNT = 4 object_payment_plan = models.ForeignKey( ObjectPaymentPlan, on_delete=models.CASCADE ) count = models.IntegerField() state = models.ForeignKey( CheckoutState, default=default_checkout_state, on_delete=models.CASCADE ) deposit = models.BooleanField(help_text="Is this the initial payment") amount = models.DecimalField(max_digits=8, decimal_places=2) due = models.DateField() retry_count = models.IntegerField(blank=True, null=True) objects = ObjectPaymentPlanInstalmentManager() class Meta: unique_together = ( ("object_payment_plan", "due"), ("object_payment_plan", "count"), ) verbose_name = "Payments for an object" verbose_name_plural = "Payments for an object" def __str__(self): return "{} {} {}".format( self.object_payment_plan.payment_plan.name, self.due, self.amount )
[docs] def get_absolute_url(self): """TODO Update this to display the payment plan.""" return reverse( "checkout.object.payment.plan", args=[self.object_payment_plan.pk] )
@property def can_mark_paid(self): result = False if not self.object_payment_plan.deleted: if not self.state.slug == CheckoutState.SUCCESS: result = True return result @property def checkout_actions(self): """Payments are normally charged directly.""" return [CheckoutAction.PAYMENT] @property def checkout_audit(self): return Checkout.objects.audit_content_object(self) @property def checkout_can_charge(self): """Check we can take the payment.""" result = False slug = self.state.slug if self.deposit: check = slug in (CheckoutState.FAIL, CheckoutState.PENDING) else: check = slug in (CheckoutState.PENDING, CheckoutState.REQUEST) if check and not self.object_payment_plan.deleted: result = True return result @property def checkout_description(self): result = [] if self.deposit: result.append("deposit") else: # instalments to display as '3 of 6' rather than '4 of 7' #1389 result.append( "{} of {} instalments".format( self.current_instalment, self.object_payment_plan.instalment_count, ) ) return result @property def checkout_email(self): return self.object_payment_plan.content_object.checkout_email
[docs] def checkout_fail(self, checkout): """Update the object to record the payment failure. Called from within a transaction so you can update the model. """ self.state = CheckoutState.objects.fail self.save() self.object_payment_plan.content_object.instalment_fail( checkout, self.due )
[docs] def checkout_fail_url(self, checkout_pk): """No UI, so no URL.""" return self.checkout_success_url(checkout_pk)
@property def checkout_name(self): return self.object_payment_plan.content_object.checkout_name
[docs] def checkout_success(self, checkout): """Update the object to record the payment success. Called from within a transaction and you can update the model. """ self.state = checkout.state self.save() if self.deposit: self.object_payment_plan.create_instalments() self.object_payment_plan.content_object.instalment_success(checkout)
[docs] def checkout_success_url(self, checkout_pk): return reverse( "checkout.object.payment.plan", args=[self.object_payment_plan.pk] )
@property def checkout_total(self): return self.amount @property def current_instalment(self): if self.deposit: raise CheckoutError( "Cannot ask the deposit record for it's instalment count" ) if self.count > 1: return self.count - 1 else: raise CheckoutError( "The 'count' value for an instalment should be greater than 1" )
[docs] def set_retry_payment(self): slug = self.state.slug if self.object_payment_plan.deleted: raise CheckoutError( "Cannot retry payments for deleted payment plans." ) elif slug in (CheckoutState.FAIL, CheckoutState.REQUEST): self.state = CheckoutState.objects.pending self.retry_count = 0 self.save() else: raise CheckoutError("Can only retry failed or requested payments.")
reversion.register(ObjectPaymentPlanInstalment)
[docs]class PaymentRunManager(models.Manager):
[docs] def create_payment_run(self): obj = self.model(created=timezone.now()) obj.save() return obj
[docs] def notify(self, payment_run, error_count): """Send notification of payment run""" email_addresses = [n.email for n in Notify.objects.all()] if email_addresses: subject = "Payment plan activity update - {}".format( timezone.now().strftime("%d/%m/%Y %H:%M") ) message = "Processed {} payment transactions.\n\n".format( payment_run.item_count() ) if error_count: message = message + ( "{} failed transaction(s).\n\n".format(error_count) ) message = message + ( "To view the transactions, log into your dashboard " "and click:\nSettings, Checkout - Payment Plans - Audit" ) queue_mail_message(payment_run, email_addresses, subject, message) # will also trigger sending of emails to customers transaction.on_commit(lambda: process_mail.delay()) else: logger.error( "Cannot send email notification of payment transactions. " "No email addresses set-up in 'enquiry.models.Notify'" )
[docs] def process_payments(self): """Process pending payments. We set the status to 'request' before asking for the money. This is because we can't put the payment request into a transaction. If we are not careful, we could have a situation where the payment succeeds and we don't manage to set the state to 'success'. In the code below, if the payment fails the record will be left in the 'request' state and so we won't ask for the money again. """ error_count = 0 payment_run = None pks = [] qs = ObjectPaymentPlanInstalment.objects.due if qs.count(): with transaction.atomic(): payment_run = self.create_payment_run() for instalment in qs: item = PaymentRunItem.objects.create_payment_run_item( payment_run, instalment ) pks.append(item.pk) for pk in pks: item = PaymentRunItem.objects.get(pk=pk) with transaction.atomic(): # make sure the payment is still pending instalment = ObjectPaymentPlanInstalment.objects.select_for_update( nowait=True ).get( pk=item.instalment.pk ) valid_states = (CheckoutState.FAIL, CheckoutState.PENDING) if instalment.state.slug not in valid_states: raise CheckoutError( "Instalment '{}' must be in the failed or pending " "state: '{}'".format( instalment.pk, instalment.state.slug ) ) # we are ready to request payment instalment.state = CheckoutState.objects.request retry_count = instalment.retry_count or 0 instalment.retry_count = retry_count + 1 instalment.save() # request payment try: Checkout.objects.charge(instalment, AnonymousUser(), item) except CheckoutError as e: logger.error(e) error_count = error_count + 1 if payment_run: self.notify(payment_run, error_count) return len(pks), error_count
[docs]class PaymentRun(models.Model): created = models.DateTimeField() objects = PaymentRunManager() class Meta: ordering = ("created",) verbose_name = "Payment Run" verbose_name_plural = "Payment Runs" def __str__(self): return "Payment Run {} created {}".format( self.pk, self.created.strftime("%d/%m/%Y %H:%M") )
[docs] def item_count(self): return self.paymentrunitem_set.count()
[docs]class PaymentRunItemManager(models.Manager):
[docs] def create_payment_run_item(self, payment_run, instalment): obj = self.model( created=timezone.now(), payment_run=payment_run, instalment=instalment, ) obj.save() return obj
[docs]class PaymentRunItem(models.Model): created = models.DateTimeField() modified = models.DateTimeField(auto_now=True) payment_run = models.ForeignKey(PaymentRun, on_delete=models.CASCADE) instalment = models.ForeignKey( ObjectPaymentPlanInstalment, on_delete=models.CASCADE ) checkout = models.ForeignKey( Checkout, blank=True, null=True, on_delete=models.CASCADE ) objects = PaymentRunItemManager() class Meta: ordering = ("payment_run__pk", "created") unique_together = ("payment_run", "instalment") verbose_name = "Payment Run Item" verbose_name_plural = "Payment Run Items" def __str__(self): return "{} created {} for instalment {}".format( self.payment_run.pk, self.payment_run.created.strftime("%d/%m/%Y %H:%M"), self.instalment.pk, )
[docs]class CheckoutSettingsManager(models.Manager): @property def settings(self): try: return self.model.objects.get() except self.model.DoesNotExist: raise CheckoutError( "Checkout settings have not been set-up in admin" )
[docs]class CheckoutSettings(SingletonModel): default_payment_plan = models.ForeignKey( PaymentPlan, on_delete=models.CASCADE ) objects = CheckoutSettingsManager() class Meta: verbose_name = "Checkout Settings" def __str__(self): return "Default Payment Plan: {}".format(self.default_payment_plan.name)
reversion.register(CheckoutSettings) # class ObjectPaymentPlanInstalmentCheckoutAudit(TimeStampedModel): # """Keep an audit of checkout status.""" # # object_payment_plan_instalment = models.ForeignKey( # ObjectPaymentPlanInstalment # ) # state = models.ForeignKey( # CheckoutState, # default=default_checkout_state, # #blank=True, null=True # ) # # ChargeAudit.objects.create_charge_audit( # content_object, # current_user, # checkout, # )