Files
consumocuidado-server/products/views.py
2021-03-12 13:45:36 +00:00

334 lines
12 KiB
Python

import logging
import csv
import datetime
import json
from django.db.models import Q
from django.core import serializers
from django.core.validators import validate_email
from django.contrib.auth import get_user_model
from django.template.loader import render_to_string
from django.core.mail import EmailMessage
# Create your views here.
from rest_framework import status
from rest_framework import viewsets
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticatedOrReadOnly, IsAdminUser, IsAuthenticated, AllowAny
from rest_framework.decorators import api_view, permission_classes, action
from rest_framework.filters import OrderingFilter
from django_filters.rest_framework import DjangoFilterBackend
import requests
from history.models import HistorySync
from dal import autocomplete
from products.models import Product, CategoryTag
from products.serializers import ProductSerializer, TagFilterSerializer, SearchResultSerializer
from companies.models import Company
from stats.models import StatsLog
from back_latienda.permissions import IsCreator, IsSiteAdmin, ReadOnly
from .utils import extract_search_filters, ranked_product_search, product_loader, get_related_products
from utils.tag_serializers import TaggitSerializer
from utils.tag_filters import ProductTagFilter, ProductOrderFilter
User = get_user_model()
logging.basicConfig(
filename='logs/product-load.log',
filemode='w',
format='%(levelname)s:%(message)s',
level=logging.INFO,
)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.filter(active=True).order_by('-created')
serializer_class = ProductSerializer
permission_classes = [IsAuthenticatedOrReadOnly, IsCreator]
filter_backends = [DjangoFilterBackend, OrderingFilter]
filterset_class = ProductTagFilter
def perform_create(self, serializer):
serializer.save(creator=self.request.user, company=self.request.user.company)
@action(detail=True, methods=['GET',])
def related(self, request, pk=None):
"""Find instances similar to the one referenced
"""
# TODO: find the most similar products
product = self.get_object()
qs = get_related_products(product)
serializer = self.serializer_class(qs, many=True)
return Response(data=serializer.data)
class MyProductsViewSet(viewsets.ModelViewSet):
""" Allows user to get all products where user is product creator
"""
model = Product
serializer_class = ProductSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
return self.model.objects.filter(company=self.request.user.company).order_by('-created')
class AdminProductsViewSet(viewsets.ModelViewSet):
""" Allows user with role 'SITE_ADMIN' to access all product instances
"""
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [IsSiteAdmin]
def perform_create(self, serializer):
serializer.save(creator=self.request.user)
@api_view(['POST',])
@permission_classes([IsAuthenticated,])
def load_coop_products(request):
"""Read CSV file being received
Parse it to create products for related Company
Authenticated user must have a related company
"""
# check company linked to user
if request.user.company is None:
return Response({"errors":{"details": "Your user has no company to add products to"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
try:
csv_file = request.FILES['csv_file']
if csv_file.name.endswith('.csv') is not True:
logging.error(f"File {csv_file.name} is not a CSV file")
return Response({"errors":{"details": "File is not CSV type"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
logging.info(f"Reading contents of {csv_file.name}")
decoded_file = csv_file.read().decode('utf-8').splitlines()
csv_reader = csv.DictReader(decoded_file, delimiter=',')
count = product_loader(csv_reader, request.user)
if count is None:
return Response({"errors": {"details": "Authenticated user is not related to any company"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
return Response(f"{count} products registered for {request.user.company.company_name}")
except Exception as e:
return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET',]) # include allowed methods
def product_search(request):
"""
Takes a string of data, return relevant products
Params:
- q: used for search [MANDATORY]
- limit: max number of returned instances [OPTIONAL]
- offset: where to start counting results [OPTIONAL]
- shipping_cost: true/false
- discount: true/false
- category: string
- tags: string
- order: string (newest/oldest)
- price_min: int
- price_max: int
In the response:
- filters
- count
- products
- price_min
- price_max
"""
# capture query params
q = request.GET.get('q', None)
limit = request.GET.get('limit', None)
offset = request.GET.get('offset', None)
shipping_cost = request.GET.get('shipping_cost', None)
if shipping_cost is not None:
if shipping_cost == 'true':
shipping_cost = True
elif shipping_cost == 'false':
shipping_cost = False
else:
shipping_cost = None
discount = request.GET.get('discount', None)
if discount is not None:
if discount == 'true':
discount = True
elif discount == 'false':
discount = False
else:
discount = None
category = request.GET.get('category', None)
tags = request.GET.get('tags', None)
price_min = request.GET.get('price_min', None)
price_max = request.GET.get('price_max', None)
order = request.GET.get('order', '')
if q is None:
return Response({"errors": {"details": "No query string to parse"}})
try:
# we collect our results here
result_set = set()
# values for response
prices = {
'min': None,
'max': None,
}
if q == '':
# filter entire queryset
products_qs = Product.objects.filter(active=True)
if tags:
products_qs = Product.objects.filter(tags=tags)
if category:
products_qs = Product.objects.filter(category=category)
# serialize and list data
serializer = ProductSerializer(products_qs, many=True)
result_list = [dict(i) for i in serializer.data]
else:
# split query string into single words
chunks = q.split(' ')
for chunk in chunks:
product_set, min_price, max_price = ranked_product_search(chunk, shipping_cost, discount, category, tags, price_min, price_max)
# update price values
if product_set:
if prices['min'] is None or min_price['price__min'] < prices['min']:
prices['min'] = min_price['price__min']
if prices['max'] is None or max_price['price__max'] > prices['max']:
prices['max'] = max_price['price__max']
# add to result set
result_set.update(product_set)
# serialize and list data
serializer = SearchResultSerializer(product_set, many=True)
result_list = [dict(i) for i in serializer.data]
# extract filters from result_set
filters = extract_search_filters(result_set)
# order the results
if order == 'newest':
# order results by created
result_list = sorted(result_list, key= lambda x:x['created'], reverse=True)
elif order == 'oldest':
# order results by created
result_list = sorted(result_list, key= lambda x:x['created'], reverse=False)
elif q != '':
# order results by RANK
result_list = sorted(result_list, key= lambda x:x['rank'], reverse=True)
total_results = len(result_list)
# RESULTS PAGINATION
if limit is not None and offset is not None:
limit = int(limit)
offset = int(offset)
result_list = result_list[offset:(limit+offset)]
elif limit is not None:
limit = int(limit)
result_list = result_list[:limit]
return Response(data={"filters": filters, "count": total_results, "products": result_list, 'prices': prices})
except Exception as e:
return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class CategoryTagAutocomplete(autocomplete.Select2QuerySetView):
def get_queryset(self):
# Get the tag model dynamically
Tag = Product.category.tag_model
# Return nothing if not auth
if self.request.user.is_anonymous:
return Tag.objects.none()
qs = Tag.objects.all()
if self.q:
qs = qs.filter(name__icontains=self.q)
return qs # [x.label for x in qs]
@api_view(['POST'])
@permission_classes([AllowAny,])
def purchase_email(request):
"""Notify coop manager and user about item purchase
"""
data = json.loads(request.body)
# check data
if request.user.is_anonymous and 'email' not in data:
return Response({"error": "Anonymous users must include an email parameter value"}, status=status.HTTP_406_NOT_ACCEPTABLE)
try:
for param in ('telephone', 'company', 'product', 'comment'):
assert(param in data.keys())
except:
return Response({"error": "Required parameters for anonymous user: telephone, company, product, comment"}, status=status.HTTP_406_NOT_ACCEPTABLE)
if request.user.is_anonymous:
user_email = data.get('email')
else:
user_email = request.user.email
telephone = data.get('telephone')
# validate email
try:
validate_email(user_email)
except:
return Response({"error": "Value for email is not valid"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get comment
comment = data.get('comment')
# get company
company = Company.objects.filter(id=data['company']).first()
if not company:
return Response({"error": "Invalid value for company"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get managing user
managing_user = User.objects.filter(company=company, role='COOP_MANAGER').first()
if not managing_user:
return Response({"error": "No managing user found for company"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get product
product = Product.objects.filter(id=data['product'], company=company).first()
if not product:
return Response({"error": "Invalid value for product"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# check company.email
if company.email is None:
return Response({"error": "Related compay has no contact email address"}, status=status.HTTP_406_NOT_ACCEPTABLE)
try:
# send email to company
company_message = render_to_string('purchase_notification.html', {
'company': company,
'user': request.user,
'product': product,
'telephone': data['telephone'],
'comment': comment,
})
subject = "[latienda.coop] Solicitud de compra"
email = EmailMessage(subject, company_message, to=[company.email])
email.content_subtype = "html"
email.send()
logging.info(f"Email sent to {company}")
# send confirmation email to user
user_message = render_to_string('purchase_contact_confirmation.html', {
'company': company,
'product': product,
'company_message': company_message,
'manager': managing_user,
})
subject = 'Confirmación de petición de compra'
email = EmailMessage(subject, user_message, to=[user_email])
email.content_subtype = "html"
email.send()
logging.info(f"Purchase Contact confirmation email sent to {user_email}")
except Exception as e:
return Response({'error': f"Could not send emails [{str(type(e))}]: {str(e)}"}, status=500)
# create statslog instance to register interaction
stats_data = {
'action_object': product,
'user': request.user if request.user.is_authenticated else None,
'anonymous': request.user.is_anonymous,
'contact': True,
'shop': company.shop,
}
StatsLog.objects.create(**stats_data)
# response
return Response()