# -*- encoding: utf-8 -*-
import logging
import stripe
from datetime import date
from dateutil.relativedelta import relativedelta
from dateutil.rrule import MONTHLY, rrule
from decimal import Decimal
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models, transaction
from django.urls import reverse
from django.utils import timezone
from reversion import revisions as reversion
from base.model_utils import TimeStampedModel
from base.singleton import SingletonModel
from base.url_utils import url_with_querystring
from mail.models import Mail, Notify
from mail.service import queue_mail_message, queue_mail_template
from mail.tasks import process_mail
CURRENCY = "GBP"
logger = logging.getLogger(__name__)
def _card_error(e):
return (
"CardError: param '{}' code '{}' http body '{}' "
"http status '{}'".format(e.param, e.code, e.http_body, e.http_status)
)
def _stripe_error(e):
return "http body: '{}' http status: '{}'".format(
e.http_body, e.http_status
)
[docs]def as_pennies(total):
return int(total * Decimal("100"))
[docs]def default_checkout_state():
return CheckoutState.objects.get(slug=CheckoutState.PENDING).pk
[docs]def expiry_date_as_str(item):
d = item.get("expiry_date", None)
return d.strftime("%Y%m%d") if d else ""
[docs]class CheckoutError(Exception):
def __init__(self, value):
Exception.__init__(self)
self.value = value
def __str__(self):
return repr("%s, %s" % (self.__class__.__name__, self.value))
[docs]class CheckoutStateManager(models.Manager):
@property
def fail(self):
return self.model.objects.get(slug=self.model.FAIL)
@property
def pending(self):
return self.model.objects.get(slug=self.model.PENDING)
@property
def request(self):
"""The 'request' state is used for payment plans only.
It allows the system to set the state to ``request`` before charging
the account.
"""
return self.model.objects.get(slug=self.model.REQUEST)
@property
def success(self):
return self.model.objects.get(slug=self.model.SUCCESS)
[docs]class CheckoutState(TimeStampedModel):
FAIL = "fail"
PENDING = "pending"
# The 'request' state is used for payment plans only.
REQUEST = "request"
SUCCESS = "success"
name = models.CharField(max_length=100)
slug = models.SlugField(unique=True)
objects = CheckoutStateManager()
class Meta:
ordering = ("name",)
verbose_name = "Checkout state"
verbose_name_plural = "Checkout states"
def __str__(self):
return "{}".format(self.name)
@property
def is_success(self):
return self.slug == self.SUCCESS
@property
def is_pending(self):
return self.slug == self.PENDING
reversion.register(CheckoutState)
[docs]class CheckoutActionManager(models.Manager):
@property
def card_refresh(self):
return self.model.objects.get(slug=self.model.CARD_REFRESH)
@property
def charge(self):
return self.model.objects.get(slug=self.model.CHARGE)
@property
def invoice(self):
return self.model.objects.get(slug=self.model.INVOICE)
@property
def manual(self):
return self.model.objects.get(slug=self.model.MANUAL)
@property
def payment(self):
return self.model.objects.get(slug=self.model.PAYMENT)
@property
def payment_plan(self):
return self.model.objects.get(slug=self.model.PAYMENT_PLAN)
[docs]class CheckoutAction(TimeStampedModel):
CARD_REFRESH = "card_refresh"
CHARGE = "charge"
INVOICE = "invoice"
MANUAL = "manual"
PAYMENT = "payment"
PAYMENT_PLAN = "payment_plan"
name = models.CharField(max_length=100)
slug = models.SlugField(unique=True)
payment = models.BooleanField()
objects = CheckoutActionManager()
class Meta:
ordering = ("name",)
verbose_name = "Checkout action"
verbose_name_plural = "Checkout action"
def __str__(self):
return "{}".format(self.name)
@property
def invoice(self):
return self.slug == self.INVOICE
@property
def payment_plan(self):
return self.slug == self.PAYMENT_PLAN
reversion.register(CheckoutAction)
[docs]class CustomerManager(models.Manager):
def _create_customer(self, name, email, customer_id):
obj = self.model(name=name, email=email, customer_id=customer_id)
obj.save()
return obj
def _stripe_create(self, email, description, token):
"""Use the Stripe API to create a customer."""
try:
stripe.api_key = settings.STRIPE_SECRET_KEY
customer = stripe.Customer.create(
email=email, description=description, card=token
)
return customer.id
except (
stripe.error.InvalidRequestError,
stripe.error.StripeError,
) as e:
raise CheckoutError(
"Error creating Stripe customer '{}': {}".format(
email, _stripe_error(e)
)
) from e
def _stripe_get_card_expiry(self, customer_id):
result = (0, 0)
stripe.api_key = settings.STRIPE_SECRET_KEY
customer = stripe.Customer.retrieve(customer_id)
default_card = customer["default_source"]
# find the details of the default card
for card in customer["sources"]["data"]:
if card["id"] == default_card:
# find the expiry date of the default card
result = (int(card["exp_year"]), int(card["exp_month"]))
break
return result
def _stripe_update(self, customer_id, description, token):
"""Use the Stripe API to update a customer."""
try:
stripe.api_key = settings.STRIPE_SECRET_KEY
stripe_customer = stripe.Customer.retrieve(customer_id)
stripe_customer.description = description
stripe_customer.card = token
stripe_customer.save()
except stripe.error.StripeError as e:
raise CheckoutError(
"Error updating Stripe customer '{}': {}".format(
customer_id, _stripe_error(e)
)
) from e
[docs] def get_customer(self, email):
"""Get the Stripe customer from the email address.
Note: email addresses are case-sensitive. For more info, see
https://code.djangoproject.com/ticket/17561.
"""
result = self.get_customers(email).first()
if not result:
raise self.model.DoesNotExist(
"Cannot find a customer record for '{}'".format(email)
)
return result
[docs] def get_customers(self, email):
return self.model.objects.filter(email__iexact=email).order_by("-pk")
[docs] def init_customer(self, content_object, token):
"""Initialise Stripe customer using email, description and token.
1. Lookup existing customer record in the database.
- Retrieve customer from Stripe and update description and token.
2. If the customer does not exist:
- Create Stripe customer with email, description and token.
- Create a customer record in the database.
Return the customer database record and clear the card ``refresh``
flag.
"""
name = content_object.checkout_name
email = content_object.checkout_email
try:
obj = self.get_customer(email)
obj.name = name
obj.refresh = False
obj.save()
self._stripe_update(obj.customer_id, name, token)
except self.model.DoesNotExist:
customer_id = self._stripe_create(email, name, token)
obj = self._create_customer(name, email, customer_id)
return obj
@property
def refresh(self):
"""Customers with expiring cards."""
return self.model.objects.filter(refresh=True)
[docs] def refresh_credit_card(self, email):
result = False
try:
customer = self.get_customer(email)
result = customer.refresh
except self.model.DoesNotExist:
pass
return result
[docs] def update_card_expiry(self, email):
"""Find the customer, get the expiry date from Stripe and update."""
try:
obj = self.get_customer(email)
year, month = self._stripe_get_card_expiry(obj.customer_id)
if year and month:
# last day of the month
obj.expiry_date = date(year, month, 1) + relativedelta(
months=+1, day=1, days=-1
)
# is the card expiring soon?
is_expiring = obj.is_expiring
if obj.refresh == is_expiring:
pass
else:
with transaction.atomic():
obj.refresh = is_expiring
# save the details
obj.save()
# email the customer
queue_mail_template(
obj,
self.model.MAIL_TEMPLATE_CARD_EXPIRY,
{obj.email: dict(name=obj.name)},
)
except Customer.DoesNotExist:
pass
[docs]class Customer(TimeStampedModel):
"""Stripe Customer.
Link the Stripe customer to an email address (and name).
Note: It is expected that multiple users in our databases could have the
same email address. If they have different names, then this table looks
very confusing. Try checking the 'content_object' of the 'Checkout' model
if you need to diagnose an issue.
"""
MAIL_TEMPLATE_CARD_EXPIRY = "customer_card_expiry"
MAIL_TEMPLATE_CARD_REFRESH_REQUEST = "contact_card_refresh_request"
MAIL_TEMPLATE_REMINDER = "customer_instalment_reminder"
name = models.TextField()
email = models.EmailField(unique=True)
customer_id = models.TextField()
expiry_date = models.DateField(blank=True, null=True)
refresh = models.BooleanField(
default=False,
help_text="Should the customer refresh their card details?",
)
objects = CustomerManager()
class Meta:
ordering = ("pk",)
verbose_name = "Customer"
verbose_name_plural = "Customers"
def __str__(self):
return "{} {}".format(self.email, self.customer_id)
[docs] def get_absolute_url(self):
return url_with_querystring(
reverse("checkout.customer"), email=self.email
)
@property
def checkout_actions(self):
return [CheckoutAction.CARD_REFRESH]
@property
def checkout_can_charge(self):
"""We can always take a payment for this object!"""
return True
@property
def checkout_description(self):
result = "{}".format(self.name)
return [result]
@property
def checkout_email(self):
return self.email
[docs] def checkout_fail(self, checkout):
pass
[docs] def checkout_fail_url(self, checkout_pk):
return self.get_absolute_url()
@property
def checkout_name(self):
return "{}".format(self.name)
[docs] def checkout_success(self, checkout):
pass
[docs] def checkout_success_url(self, checkout_pk):
return self.get_absolute_url()
@property
def checkout_total(self):
return Decimal()
@property
def is_expiring(self):
"""Is the card expiring within the next month?
If the ``expiry_date`` is ``None``, then it has *not* expired.
The expiry date is set to the last day of the month e.g. for September
2015, the ``expiry_date`` will be 30/09/2015.
"""
result = False
one_month = date.today() + relativedelta(months=+1)
if self.expiry_date and self.expiry_date <= one_month:
return True
return result
reversion.register(Customer)
[docs]class CustomerPayment(TimeStampedModel):
"""Take a payment from a customer."""
user = models.ForeignKey(
settings.AUTH_USER_MODEL, related_name="+", on_delete=models.CASCADE
)
customer = models.ForeignKey(Customer, on_delete=models.CASCADE)
description = models.CharField(max_length=100)
total = models.DecimalField(max_digits=8, decimal_places=2)
class Meta:
ordering = ("pk",)
verbose_name = "Customer payment"
verbose_name_plural = "Customer payments"
def __str__(self):
return "{} {} {}".format(
self.customer.email, self.description, self.total
)
[docs] def get_absolute_url(self):
return url_with_querystring(
reverse("checkout.customer"), email=self.customer.email
)
@property
def checkout_actions(self):
return [CheckoutAction.PAYMENT]
@property
def checkout_can_charge(self):
return bool(self.total)
@property
def checkout_description(self):
result = "{}".format(self.description)
return [result]
@property
def checkout_email(self):
return self.customer.email
[docs] def checkout_fail(self, checkout):
pass
[docs] def checkout_fail_url(self, checkout_pk):
return self.get_absolute_url()
@property
def checkout_name(self):
return "{}".format(self.customer.name)
[docs] def checkout_success(self, checkout):
pass
[docs] def checkout_success_url(self, checkout_pk):
return self.get_absolute_url()
@property
def checkout_total(self):
return self.total
reversion.register(CustomerPayment)
[docs]class CheckoutManager(models.Manager):
def _notify_user(self, checkout, current_user):
"""Notify the user (unless they are a member of staff or anonymous)."""
if current_user.is_staff or current_user.is_anonymous:
pass
else:
checkout.notify()
[docs] def audit(self, content_type=None):
return self.model.objects.all().order_by("-pk")
[docs] def audit_content_object(self, content_object):
content_type = ContentType.objects.get_for_model(content_object)
return self.model.objects.filter(
content_type=content_type, object_id=content_object.pk
).order_by("-pk")
[docs] def audit_content_type(self, content_type):
return self.model.objects.filter(content_type=content_type).order_by(
"-pk"
)
[docs] def audit_customers(self, customers):
return self.model.objects.filter(customer__in=customers).order_by("-pk")
[docs] def create_checkout(self, action, content_object, user):
"""Create a checkout payment request."""
if action == CheckoutAction.objects.card_refresh:
total = None
else:
total = content_object.checkout_total
obj = self.model(
checkout_date=timezone.now(),
action=action,
content_object=content_object,
description=", ".join(content_object.checkout_description),
total=total,
)
# an anonymous user can create a checkout
if user.is_authenticated:
obj.user = user
obj.save()
return obj
[docs] def charge(self, content_object, current_user, payment_run_item=None):
"""Collect some money from a customer.
You must be a member of staff to use this method. For payment plans,
a background process will charge the card. In this case, the user will
be anonymous.
We should only attempt to collect money if the customer has already
entered their card details.
"""
content_object.refresh_from_db()
if not content_object.checkout_can_charge:
raise CheckoutError("Cannot charge the card.")
try:
customer = Customer.objects.get_customer(
content_object.checkout_email
)
except Customer.DoesNotExist as e:
raise CheckoutError(
"Customer '{}' has not registered a card".format(
content_object.checkout_email
)
) from e
with transaction.atomic():
checkout = self.create_checkout(
CheckoutAction.objects.charge, content_object, current_user
)
checkout.customer = customer
checkout.save()
if payment_run_item:
payment_run_item.checkout = checkout
payment_run_item.save()
try:
checkout.charge(current_user)
with transaction.atomic():
checkout.success()
self._notify_user(checkout, current_user)
except CheckoutError as e:
logger.error(e)
with transaction.atomic():
checkout.fail()
self._notify_user(checkout, current_user)
raise
[docs] def manual(self, content_object, current_user):
"""Mark a transaction as paid (manual).
You must be a member of staff to use this method.
"""
content_object.refresh_from_db()
if not current_user.is_staff:
raise CheckoutError(
"Only a member of staff can mark this transaction as paid."
)
valid_state = (
CheckoutState.FAIL,
CheckoutState.PENDING,
CheckoutState.REQUEST,
)
if content_object.state.slug not in valid_state:
raise CheckoutError("Cannot mark this transaction as paid.")
checkout = self.create_checkout(
CheckoutAction.objects.manual, content_object, current_user
)
checkout.save()
try:
with transaction.atomic():
checkout.success()
except CheckoutError:
with transaction.atomic():
checkout.fail()
[docs] def success(self):
return self.audit().filter(state=CheckoutState.objects.success)
[docs] def for_content_object(self, obj):
return self.model.objects.get(
content_type=ContentType.objects.get_for_model(obj),
object_id=obj.pk,
)
[docs]class Checkout(TimeStampedModel):
"""Checkout.
Create a 'Checkout' instance when you want to interact with Stripe e.g.
take a payment, get card details to set-up a payment plan or refresh the
details of an expired card.
"""
checkout_date = models.DateTimeField()
action = models.ForeignKey(CheckoutAction, on_delete=models.CASCADE)
customer = models.ForeignKey(
Customer, blank=True, null=True, on_delete=models.CASCADE
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="+",
blank=True,
null=True,
on_delete=models.CASCADE,
help_text=(
"User who created the checkout request "
"(or blank if the the user is not logged in)"
),
)
state = models.ForeignKey(
CheckoutState, default=default_checkout_state, on_delete=models.CASCADE
)
description = models.TextField()
total = models.DecimalField(
max_digits=8, decimal_places=2, blank=True, null=True
)
# link to the object in the system which requested the checkout
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.PositiveIntegerField()
content_object = GenericForeignKey()
objects = CheckoutManager()
class Meta:
ordering = ("pk",)
verbose_name = "Checkout"
verbose_name_plural = "Checkouts"
def __str__(self):
return "{}".format(self.content_object.checkout_email)
def _charge(self):
"""Charge the card."""
if self.action.payment:
self._charge_stripe()
def _charge_stripe(self):
"""Create the charge on stripe's servers."""
stripe.api_key = settings.STRIPE_SECRET_KEY
try:
stripe.Charge.create(
amount=as_pennies(self.total),
currency=CURRENCY,
customer=self.customer.customer_id,
description=self.description,
)
except stripe.error.CardError as e:
raise CheckoutError(
"Card error: '{}' checkout '{}', object '{}': {}".format(
e.code, self.pk, self.content_object.pk, _card_error(e)
)
) from e
except stripe.error.StripeError as e:
raise CheckoutError(
"Card error: checkout '{}', object '{}': {}".format(
self.pk, self.content_object.pk, _stripe_error(e)
)
) from e
def _success_or_fail(self, state):
self.state = state
self.save()
[docs] def charge(self, current_user):
"""Charge the user's card.
Must be a member of staff or anonymous (used when running as a
background task) to use this method.
To take payments for the current user, use the ``charge_user`` method.
"""
if current_user.is_staff or current_user.is_anonymous:
self._charge()
else:
raise CheckoutError(
"Cannot process - payments can only "
"be taken by a member of staff. "
"Current: '{}', Customer: '{}'".format(
current_user.email, self.customer.email
)
)
[docs] def charge_user(self, current_user):
"""Charge the card of the current user.
Use this method when the logged in user is performing the transaction.
To take money from another user's card, you must be a member of staff
and use the ``charge`` method.
"""
anonymous = not current_user.is_authenticated
staff = current_user.is_staff
if anonymous or staff or self.customer.email == current_user.email:
self._charge()
else:
raise CheckoutError(
"Cannot process - payments can only be taken "
"for an anonymous user or the current user. "
"Current: '{}', Customer: '{}'".format(
current_user.email, self.customer.email
)
)
@property
def content_object_url(self):
return self.content_object.get_absolute_url()
[docs] def fail(self):
"""Checkout failed - so update and notify admin."""
self._success_or_fail(CheckoutState.objects.fail)
return self.content_object.checkout_fail(self)
@property
def failed(self):
"""Did the checkout request fail?"""
return self.state == CheckoutState.objects.fail
@property
def previous_address_data(self):
try:
if self.action.slug == "payment_plan":
data = self.checkoutadditional
return filter(
None,
(
data.address_1,
data.address_2,
data.address_3,
data.town,
data.county,
data.postcode,
data.country,
),
)
except CheckoutAdditional.DoesNotExist:
pass
return []
@property
def date_of_birth(self):
try:
data = self.checkoutadditional
return data.date_of_birth
except CheckoutAdditional.DoesNotExist:
pass
return None
@property
def invoice_data(self):
try:
if self.action.slug == "invoice":
data = self.checkoutadditional
return filter(
None,
(
data.contact_name,
data.company_name,
data.address_1,
data.address_2,
data.address_3,
data.town,
data.county,
data.postcode,
data.country,
data.email,
data.phone,
),
)
except CheckoutAdditional.DoesNotExist:
pass
return []
@property
def is_invoice(self):
return self.action == CheckoutAction.objects.invoice
@property
def is_manual(self):
return self.action == CheckoutAction.objects.manual
@property
def is_payment(self):
return self.action == CheckoutAction.objects.payment
@property
def is_payment_plan(self):
"""Used in success templates."""
return self.action == CheckoutAction.objects.payment_plan
[docs] def notify(self, request=None):
"""Send notification of checkout status.
Pass in a 'request' if you want the email to contain the URL of the
checkout transaction.
"""
email_addresses = [n.email for n in Notify.objects.all()]
if email_addresses:
caption = self.action.name
state = self.state.name
subject = "{} - {} from {}".format(
state.upper(),
caption.capitalize(),
self.content_object.checkout_name,
)
message = "{} - {} - {} from {}, {}:".format(
self.created.strftime("%d/%m/%Y %H:%M"),
state.upper(),
caption,
self.content_object.checkout_name,
self.content_object.checkout_email,
)
message = message + "\n\n{}\n\n{}".format(
self.description,
request.build_absolute_uri(self.content_object_url)
if request
else "",
)
if self.invoice_data:
message = message + "\n\nInvoice: {}".format(
", ".join(self.invoice_data)
)
if self.previous_address_data:
message = message + "\n\nPrevious address: {}".format(
", ".join(self.previous_address_data)
)
if self.date_of_birth:
dob = self.date_of_birth.strftime("%d/%m/%Y")
message = message + "\n\nDate of birth: {}".format(dob)
queue_mail_message(self, email_addresses, subject, message)
else:
logger.error(
"Cannot send email notification of checkout transaction. "
"No email addresses set-up in 'enquiry.models.Notify'"
)
[docs] def success(self):
"""Checkout successful.
This method is called from within a transaction.
See ``checkout.success`` in the model manager.
"""
self._success_or_fail(CheckoutState.objects.success)
return self.content_object.checkout_success(self)
reversion.register(Checkout)
[docs]class CheckoutAdditionalManager(models.Manager):
[docs] def create_checkout_additional(self, checkout, **kwargs):
obj = self.model(checkout=checkout, **kwargs)
obj.save()
return obj
[docs]class CheckoutAdditional(TimeStampedModel):
"""If a user decides to pay by invoice, there are the details.
Links with the 'CheckoutForm' in ``checkout/forms.py``. Probably easier to
put validation in the form if required.
"""
checkout = models.OneToOneField(Checkout, on_delete=models.CASCADE)
# company
company_name = models.CharField(max_length=100, blank=True)
address_1 = models.CharField("Address", max_length=100, blank=True)
address_2 = models.CharField("", max_length=100, blank=True)
address_3 = models.CharField("", max_length=100, blank=True)
town = models.CharField(max_length=100, blank=True)
county = models.CharField(max_length=100, blank=True)
postcode = models.CharField(max_length=20, blank=True)
country = models.CharField(max_length=100, blank=True)
# contact
contact_name = models.CharField(max_length=100, blank=True)
email = models.EmailField(blank=False)
phone = models.CharField(max_length=50, blank=True)
date_of_birth = models.DateField(blank=True, null=True)
objects = CheckoutAdditionalManager()
class Meta:
ordering = ("email",)
verbose_name = "Checkout Additional Information"
verbose_name_plural = "Checkout Additional Information"
def __str__(self):
return "{}".format(self.email)
reversion.register(CheckoutAdditional)
[docs]class PaymentPlanManager(models.Manager):
[docs] def create_payment_plan(self, slug, name, deposit, count, interval):
obj = self.model(
slug=slug,
name=name,
deposit=deposit,
count=count,
interval=interval,
)
obj.save()
return obj
[docs] def current(self):
"""List of payment plan headers excluding 'deleted'."""
return self.model.objects.exclude(deleted=True)
[docs]class PaymentPlan(TimeStampedModel):
"""Definition of a payment plan."""
name = models.TextField()
slug = models.SlugField(unique=True)
deposit = models.IntegerField(help_text="Initial deposit as a percentage")
count = models.IntegerField(help_text="Number of instalments")
interval = models.IntegerField(help_text="Instalment interval in months")
deleted = models.BooleanField(default=False)
objects = PaymentPlanManager()
class Meta:
ordering = ("slug",)
verbose_name = "Payment plan"
verbose_name_plural = "Payment plan"
def __str__(self):
return "{}".format(self.slug)
[docs] def clean(self):
if not self.count:
raise ValidationError("Set at least one instalment.")
if not self.deposit:
raise ValidationError("Set an initial deposit.")
if not self.interval:
raise ValidationError(
"Set the number of months between instalments."
)
[docs] def save(self, *args, **kwargs):
if self.can_update:
super().save(*args, **kwargs)
else:
raise CheckoutError("Payment plan in use. Cannot be updated.")
@property
def can_update(self):
count = ObjectPaymentPlan.objects.filter(payment_plan=self).count()
if count:
return False
else:
return True
[docs] def deposit_amount(self, total):
return (total * (self.deposit / Decimal("100"))).quantize(
Decimal(".01")
)
[docs] def instalments(self, deposit_date, total):
"""Calculate the instalment dates and values."""
# deposit
deposit = self.deposit_amount(total)
# list of dates
first_interval = self.interval
if deposit_date.day > 15:
first_interval = first_interval + 1
start_date = deposit_date + relativedelta(months=+first_interval, day=1)
instalment_dates = [
d.date()
for d in rrule(
MONTHLY,
count=self.count,
dtstart=start_date,
interval=self.interval,
)
]
# instalments
instalment = ((total - deposit) / self.count).quantize(Decimal(".01"))
# list of payment amounts
values = []
check = deposit
for d in instalment_dates:
value = instalment
values.append(value)
check = check + value
# make the total match
values[-1] = values[-1] + (total - check)
return list(zip(instalment_dates, values))
[docs] def example(self, deposit_date, total):
result = [(deposit_date, self.deposit_amount(total))]
return result + self.instalments(deposit_date, total)
reversion.register(PaymentPlan)
[docs]class ObjectPaymentPlanManager(models.Manager):
def _payment_plans_by_instalment_state(self, states):
values = ObjectPaymentPlanInstalment.objects.filter(
state__slug__in=states
).values_list("object_payment_plan__pk", flat=True)
# 'set' will remove duplicate 'values'
return self.model.objects.filter(pk__in=(set(values))).exclude(
deleted=True
)
[docs] def charge_deposit(self, content_object, user):
payment_plan = self.for_content_object(content_object)
payment_plan.charge_deposit(user)
[docs] def create_object_payment_plan(self, content_object, payment_plan, total):
"""Create a payment plan for an object with the initial deposit record.
This method must be called from within a transaction.
"""
obj = self.model(
content_object=content_object,
payment_plan=payment_plan,
total=total,
)
obj.save()
ObjectPaymentPlanInstalment.objects.create_object_payment_plan_instalment(
obj, 1, True, payment_plan.deposit_amount(total), date.today()
)
return obj
@property
def fail_or_request(self):
return self._payment_plans_by_instalment_state(
(CheckoutState.FAIL, CheckoutState.REQUEST)
)
[docs] def for_content_object(self, obj):
"""Return the ``ObjectPaymentPlan`` instance for ``obj``."""
return self.model.objects.get(
content_type=ContentType.objects.get_for_model(obj),
object_id=obj.pk,
)
@property
def outstanding_payment_plans(self):
"""List of outstanding payment plans.
Used to refresh card expiry dates.
"""
return self._payment_plans_by_instalment_state(
(CheckoutState.FAIL, CheckoutState.PENDING, CheckoutState.REQUEST)
)
@property
def report_card_expiry_dates(self):
"""Outstanding payment plans showing expiry dates in order."""
emails = []
result = []
payment_plans = self.outstanding_payment_plans
for item in payment_plans:
emails.append(item.content_object.checkout_email)
# get the expiry date for all the customers (as a 'dict')
customers = dict(
Customer.objects.filter(email__in=emails).values_list(
"email", "expiry_date"
)
)
for item in payment_plans:
expiry_date = customers.get(item.content_object.checkout_email)
result.append(
dict(expiry_date=expiry_date, object_payment_plan=item)
)
return sorted(result, key=expiry_date_as_str)
[docs] def refresh_card_expiry_dates(self):
"""Refresh the card expiry dates for outstanding payment plans."""
for plan in self.outstanding_payment_plans:
Customer.objects.update_card_expiry(
plan.content_object.checkout_email
)
[docs]class ObjectPaymentPlan(TimeStampedModel):
"""Payment plan for an object."""
# link to the object in the system which requested the payment plan
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.PositiveIntegerField()
content_object = GenericForeignKey()
# payment plan
payment_plan = models.ForeignKey(PaymentPlan, on_delete=models.CASCADE)
total = models.DecimalField(max_digits=8, decimal_places=2)
# is this object deleted?
deleted = models.BooleanField(default=False)
objects = ObjectPaymentPlanManager()
class Meta:
ordering = ("created",)
unique_together = ("object_id", "content_type")
verbose_name = "Object payment plan"
verbose_name_plural = "Object payment plans"
def __str__(self):
return "{} created {}".format(self.payment_plan.name, self.created)
def _check_create_instalments(self):
"""Check the current records to make sure we can create instalments."""
instalments = self.objectpaymentplaninstalment_set.all()
count = instalments.count()
if not count:
# a payment plan should always have a deposit record
raise CheckoutError(
"no deposit/instalment record set-up for "
"payment plan: '{}'".format(self.pk)
)
if count == 1:
# check the first payment is the deposit
first_instalment = instalments.first()
if not first_instalment.deposit:
raise CheckoutError(
"no deposit record for "
"payment plan: '{}'".format(self.pk)
)
else:
# cannot create instalments if already created!
raise CheckoutError(
"instalments already created for this "
"payment plan: '{}'".format(self.pk)
)
[docs] def create_instalments(self):
"""The deposit has been paid, so create the instalments."""
self._check_create_instalments()
instalments = self.payment_plan.instalments(
timezone.now().date(), self.total
)
count = 1
for due, amount in instalments:
count = count + 1
ObjectPaymentPlanInstalment.objects.create_object_payment_plan_instalment(
self, count, False, amount, due
)
[docs] def charge_deposit(self, user):
self._check_create_instalments()
deposit = self.objectpaymentplaninstalment_set.first()
Checkout.objects.charge(deposit, user)
[docs] def delete(self):
self.deleted = True
self.save()
@property
def instalment_count(self):
return self.objectpaymentplaninstalment_set.exclude(
deposit=True
).count()
[docs] def instalments_due(self, due_date=None):
"""Check if previous instalments are due.
.. note:: The ``due_date`` parameter will exclude instalments due
*before* this date.
"""
if due_date is None:
due_date = date.today()
qs = self.objectpaymentplaninstalment_set.filter(
due__lt=due_date
).exclude(state__slug=CheckoutState.SUCCESS)
return qs
@property
def payment_count(self):
return self.objectpaymentplaninstalment_set.count()
@property
def payments(self):
return self.objectpaymentplaninstalment_set.all().order_by("count")
reversion.register(ObjectPaymentPlan)
[docs]class ObjectPaymentPlanInstalmentManager(models.Manager):
# @property
# def checkout_list(self):
# """All 'checkout' transactions for object payment plan instalments.
# Use to audit checkout failures so we can block access.
# """
# return Checkout.objects.filter(
# content_type=ContentType.objects.get_for_model(self.model),
# )
def _due(self, date_gt, date_lt):
return (
self.model.objects.filter(
due__gte=date_gt,
due__lte=date_lt,
state__slug__in=(CheckoutState.FAIL, CheckoutState.PENDING),
)
.exclude(deposit=True)
.exclude(retry_count__gt=self.model.RETRY_COUNT)
.exclude(object_payment_plan__deleted=True)
)
[docs] def create_object_payment_plan_instalment(
self, object_payment_plan, count, deposit, amount, due
):
obj = self.model(
object_payment_plan=object_payment_plan,
count=count,
deposit=deposit,
amount=amount,
due=due,
)
obj.save()
return obj
@property
def due(self):
"""Lock the records while we try and take the payment.
TODO Do we need to check that a payment is not already linked to this
record?
"""
date_lt = date.today()
date_gt = date_lt + relativedelta(
days=-ObjectPaymentPlanInstalment.RETRY_COUNT
)
return self._due(date_gt, date_lt)
[docs] def reminder(self):
"""List of instalments which will be due soon.
This information will be used to send a reminder email to the customer
so they can make sure they have money in their account.
"""
date_gt = date.today()
date_lt = date_gt + relativedelta(
days=+ObjectPaymentPlanInstalment.REMINDER_DAYS
)
return self._due(date_gt, date_lt)
[docs] def reminder_emails(self):
"""
1. Which reminder emails are due.
2. Remove email addresses which have already been sent a reminder.
"""
result = {}
for instalment in self.reminder().order_by("-due"):
result[instalment.checkout_email] = instalment.pk
# check to see if we already sent a reminder for this email address
date_lt = timezone.now()
date_gt = date_lt + relativedelta(
days=-ObjectPaymentPlanInstalment.REMINDER_CHECK
)
previous_reminders = Mail.objects.emails_for_template(
date_gt, date_lt, Customer.MAIL_TEMPLATE_REMINDER
)
# remove email addresses which have already been sent a reminder
for email in previous_reminders:
result.pop(email, None)
return result
[docs] def send_payment_reminder_emails(self):
count = 0
for email, instalment_pk in self.reminder_emails().items():
instalment = self.model.objects.get(pk=instalment_pk)
context = {
email: {
"due": instalment.due.strftime("%a %d %b %Y"),
"name": instalment.checkout_name,
}
}
queue_mail_template(
instalment, Customer.MAIL_TEMPLATE_REMINDER, context
)
count = count + 1
process_mail.delay()
return count
[docs]class ObjectPaymentPlanInstalment(TimeStampedModel):
"""Payments due for an object.
The deposit record gets created first. It has the ``deposit`` field set to
``True``.
The instalment records are created after the deposit has been collected.
Instalment records have the ``deposit`` field set to ``False``.
"""
REMINDER_CHECK = 21
REMINDER_DAYS = 7
RETRY_COUNT = 4
object_payment_plan = models.ForeignKey(
ObjectPaymentPlan, on_delete=models.CASCADE
)
count = models.IntegerField()
state = models.ForeignKey(
CheckoutState, default=default_checkout_state, on_delete=models.CASCADE
)
deposit = models.BooleanField(help_text="Is this the initial payment")
amount = models.DecimalField(max_digits=8, decimal_places=2)
due = models.DateField()
retry_count = models.IntegerField(blank=True, null=True)
objects = ObjectPaymentPlanInstalmentManager()
class Meta:
unique_together = (
("object_payment_plan", "due"),
("object_payment_plan", "count"),
)
verbose_name = "Payments for an object"
verbose_name_plural = "Payments for an object"
def __str__(self):
return "{} {} {}".format(
self.object_payment_plan.payment_plan.name, self.due, self.amount
)
[docs] def get_absolute_url(self):
"""TODO Update this to display the payment plan."""
return reverse(
"checkout.object.payment.plan", args=[self.object_payment_plan.pk]
)
@property
def can_mark_paid(self):
result = False
if not self.object_payment_plan.deleted:
if not self.state.slug == CheckoutState.SUCCESS:
result = True
return result
@property
def checkout_actions(self):
"""Payments are normally charged directly."""
return [CheckoutAction.PAYMENT]
@property
def checkout_audit(self):
return Checkout.objects.audit_content_object(self)
@property
def checkout_can_charge(self):
"""Check we can take the payment."""
result = False
slug = self.state.slug
if self.deposit:
check = slug in (CheckoutState.FAIL, CheckoutState.PENDING)
else:
check = slug in (CheckoutState.PENDING, CheckoutState.REQUEST)
if check and not self.object_payment_plan.deleted:
result = True
return result
@property
def checkout_description(self):
result = []
if self.deposit:
result.append("deposit")
else:
# instalments to display as '3 of 6' rather than '4 of 7' #1389
result.append(
"{} of {} instalments".format(
self.current_instalment,
self.object_payment_plan.instalment_count,
)
)
return result
@property
def checkout_email(self):
return self.object_payment_plan.content_object.checkout_email
[docs] def checkout_fail(self, checkout):
"""Update the object to record the payment failure.
Called from within a transaction so you can update the model.
"""
self.state = CheckoutState.objects.fail
self.save()
self.object_payment_plan.content_object.instalment_fail(
checkout, self.due
)
[docs] def checkout_fail_url(self, checkout_pk):
"""No UI, so no URL."""
return self.checkout_success_url(checkout_pk)
@property
def checkout_name(self):
return self.object_payment_plan.content_object.checkout_name
[docs] def checkout_success(self, checkout):
"""Update the object to record the payment success.
Called from within a transaction and you can update the model.
"""
self.state = checkout.state
self.save()
if self.deposit:
self.object_payment_plan.create_instalments()
self.object_payment_plan.content_object.instalment_success(checkout)
[docs] def checkout_success_url(self, checkout_pk):
return reverse(
"checkout.object.payment.plan", args=[self.object_payment_plan.pk]
)
@property
def checkout_total(self):
return self.amount
@property
def current_instalment(self):
if self.deposit:
raise CheckoutError(
"Cannot ask the deposit record for it's instalment count"
)
if self.count > 1:
return self.count - 1
else:
raise CheckoutError(
"The 'count' value for an instalment should be greater than 1"
)
[docs] def set_retry_payment(self):
slug = self.state.slug
if self.object_payment_plan.deleted:
raise CheckoutError(
"Cannot retry payments for deleted payment plans."
)
elif slug in (CheckoutState.FAIL, CheckoutState.REQUEST):
self.state = CheckoutState.objects.pending
self.retry_count = 0
self.save()
else:
raise CheckoutError("Can only retry failed or requested payments.")
reversion.register(ObjectPaymentPlanInstalment)
[docs]class PaymentRunManager(models.Manager):
[docs] def create_payment_run(self):
obj = self.model(created=timezone.now())
obj.save()
return obj
[docs] def notify(self, payment_run, error_count):
"""Send notification of payment run"""
email_addresses = [n.email for n in Notify.objects.all()]
if email_addresses:
subject = "Payment plan activity update - {}".format(
timezone.now().strftime("%d/%m/%Y %H:%M")
)
message = "Processed {} payment transactions.\n\n".format(
payment_run.item_count()
)
if error_count:
message = message + (
"{} failed transaction(s).\n\n".format(error_count)
)
message = message + (
"To view the transactions, log into your dashboard "
"and click:\nSettings, Checkout - Payment Plans - Audit"
)
queue_mail_message(payment_run, email_addresses, subject, message)
# will also trigger sending of emails to customers
transaction.on_commit(lambda: process_mail.delay())
else:
logger.error(
"Cannot send email notification of payment transactions. "
"No email addresses set-up in 'enquiry.models.Notify'"
)
[docs] def process_payments(self):
"""Process pending payments.
We set the status to 'request' before asking for the money. This is
because we can't put the payment request into a transaction. If we are
not careful, we could have a situation where the payment succeeds and
we don't manage to set the state to 'success'. In the code below, if
the payment fails the record will be left in the 'request' state and
so we won't ask for the money again.
"""
error_count = 0
payment_run = None
pks = []
qs = ObjectPaymentPlanInstalment.objects.due
if qs.count():
with transaction.atomic():
payment_run = self.create_payment_run()
for instalment in qs:
item = PaymentRunItem.objects.create_payment_run_item(
payment_run, instalment
)
pks.append(item.pk)
for pk in pks:
item = PaymentRunItem.objects.get(pk=pk)
with transaction.atomic():
# make sure the payment is still pending
instalment = ObjectPaymentPlanInstalment.objects.select_for_update(
nowait=True
).get(
pk=item.instalment.pk
)
valid_states = (CheckoutState.FAIL, CheckoutState.PENDING)
if instalment.state.slug not in valid_states:
raise CheckoutError(
"Instalment '{}' must be in the failed or pending "
"state: '{}'".format(
instalment.pk, instalment.state.slug
)
)
# we are ready to request payment
instalment.state = CheckoutState.objects.request
retry_count = instalment.retry_count or 0
instalment.retry_count = retry_count + 1
instalment.save()
# request payment
try:
Checkout.objects.charge(instalment, AnonymousUser(), item)
except CheckoutError as e:
logger.error(e)
error_count = error_count + 1
if payment_run:
self.notify(payment_run, error_count)
return len(pks), error_count
[docs]class PaymentRun(models.Model):
created = models.DateTimeField()
objects = PaymentRunManager()
class Meta:
ordering = ("created",)
verbose_name = "Payment Run"
verbose_name_plural = "Payment Runs"
def __str__(self):
return "Payment Run {} created {}".format(
self.pk, self.created.strftime("%d/%m/%Y %H:%M")
)
[docs] def item_count(self):
return self.paymentrunitem_set.count()
[docs]class PaymentRunItemManager(models.Manager):
[docs] def create_payment_run_item(self, payment_run, instalment):
obj = self.model(
created=timezone.now(),
payment_run=payment_run,
instalment=instalment,
)
obj.save()
return obj
[docs]class PaymentRunItem(models.Model):
created = models.DateTimeField()
modified = models.DateTimeField(auto_now=True)
payment_run = models.ForeignKey(PaymentRun, on_delete=models.CASCADE)
instalment = models.ForeignKey(
ObjectPaymentPlanInstalment, on_delete=models.CASCADE
)
checkout = models.ForeignKey(
Checkout, blank=True, null=True, on_delete=models.CASCADE
)
objects = PaymentRunItemManager()
class Meta:
ordering = ("payment_run__pk", "created")
unique_together = ("payment_run", "instalment")
verbose_name = "Payment Run Item"
verbose_name_plural = "Payment Run Items"
def __str__(self):
return "{} created {} for instalment {}".format(
self.payment_run.pk,
self.payment_run.created.strftime("%d/%m/%Y %H:%M"),
self.instalment.pk,
)
[docs]class CheckoutSettingsManager(models.Manager):
@property
def settings(self):
try:
return self.model.objects.get()
except self.model.DoesNotExist:
raise CheckoutError(
"Checkout settings have not been set-up in admin"
)
[docs]class CheckoutSettings(SingletonModel):
default_payment_plan = models.ForeignKey(
PaymentPlan, on_delete=models.CASCADE
)
objects = CheckoutSettingsManager()
class Meta:
verbose_name = "Checkout Settings"
def __str__(self):
return "Default Payment Plan: {}".format(self.default_payment_plan.name)
reversion.register(CheckoutSettings)
# class ObjectPaymentPlanInstalmentCheckoutAudit(TimeStampedModel):
# """Keep an audit of checkout status."""
#
# object_payment_plan_instalment = models.ForeignKey(
# ObjectPaymentPlanInstalment
# )
# state = models.ForeignKey(
# CheckoutState,
# default=default_checkout_state,
# #blank=True, null=True
# )
#
# ChargeAudit.objects.create_charge_audit(
# content_object,
# current_user,
# checkout,
# )