Files
consumocuidado-server/products/views.py

317 lines
11 KiB
Python

import logging
import csv
import datetime
import json
from django.db.models import Q
from django.core import serializers
from django.contrib.auth import get_user_model
# Create your views here.
from rest_framework import status
from rest_framework import viewsets
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticatedOrReadOnly, IsAdminUser, IsAuthenticated
from rest_framework.decorators import api_view, permission_classes, action
from rest_framework.filters import OrderingFilter
from django_filters.rest_framework import DjangoFilterBackend
import requests
from history.models import HistorySync
from dal import autocomplete
from products.models import Product, CategoryTag
from products.serializers import ProductSerializer, TagFilterSerializer, SearchResultSerializer
from companies.models import Company
from back_latienda.permissions import IsCreator, IsSiteAdmin, ReadOnly
from .utils import extract_search_filters, find_related_products_v6, product_loader, find_related_products_v7
from utils.tag_serializers import TaggitSerializer
from utils.tag_filters import ProductTagFilter, ProductOrderFilter
User = get_user_model()
logging.basicConfig(
filename='logs/product-load.log',
filemode='w',
format='%(levelname)s:%(message)s',
level=logging.INFO,
)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.filter(active=True).order_by('-created')
serializer_class = ProductSerializer
permission_classes = [IsAuthenticatedOrReadOnly, IsCreator]
filter_backends = [DjangoFilterBackend, OrderingFilter]
filterset_class = ProductTagFilter
def perform_create(self, serializer):
serializer.save(creator=self.request.user, company=self.request.user.company)
@action(detail=True, methods=['GET',])
def related(self, request, pk=None):
"""Find instances similar to the one referenced
"""
# TODO: find the most similar products
product = self.get_object()
qs = find_related_products_v7(product.description, product.tags.all(), product.attributes.all(), product.category)
serializer = self.serializer_class(qs, many=True)
return Response(data=serializer.data)
class MyProductsViewSet(viewsets.ModelViewSet):
""" Allows user to get all products where user is product creator
"""
model = Product
serializer_class = ProductSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
return self.model.objects.filter(creator=self.request.user)
def perform_create(self, serializer):
serializer.save(creator=self.request.user)
class AdminProductsViewSet(viewsets.ModelViewSet):
""" Allows user with role 'SITE_ADMIN' to access all product instances
"""
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [IsSiteAdmin]
def perform_create(self, serializer):
serializer.save(creator=self.request.user)
@api_view(['POST',])
@permission_classes([IsAuthenticated,])
def load_coop_products(request):
"""Read CSV file being received
Parse it to create products for related Company
Authenticated user must have a related company
"""
# check company linked to user
if request.user.company is None:
return Response({"errors":{"details": "Your user has no company to add products to"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
try:
csv_file = request.FILES['csv_file']
if csv_file.name.endswith('.csv') is not True:
logging.error(f"File {csv_file.name} is not a CSV file")
return Response({"errors":{"details": "File is not CSV type"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
logging.info(f"Reading contents of {csv_file.name}")
decoded_file = csv_file.read().decode('utf-8').splitlines()
csv_reader = csv.DictReader(decoded_file, delimiter=',')
count = product_loader(csv_reader, request.user)
if count is None:
return Response({"errors": {"details": "Authenticated user is not related to any company"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
return Response(f"{count} products registered for {request.user.company.company_name}")
except Exception as e:
return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['GET',]) # include allowed methods
def product_search(request):
"""
Takes a string of data, return relevant products
Params:
- q: used for search [MANDATORY]
- limit: max number of returned instances [OPTIONAL]
- offset: where to start counting results [OPTIONAL]
- shipping_cost: true/false
- discount: true/false
- category: string
- tags: string
- order: string (newest/oldest)
- price_min: int
- price_max: int
In the response:
- filters
- count
- products
- price_min
- price_max
"""
# capture query params
q = request.GET.get('q', None)
limit = request.GET.get('limit', None)
offset = request.GET.get('offset', None)
shipping_cost = request.GET.get('shipping_cost', None)
if shipping_cost is not None:
if shipping_cost == 'true':
shipping_cost = True
elif shipping_cost == 'false':
shipping_cost = False
else:
shipping_cost = None
discount = request.GET.get('discount', None)
if discount is not None:
if discount == 'true':
discount = True
elif discount == 'false':
discount = False
else:
discount = None
category = request.GET.get('category', None)
tags = request.GET.get('tags', None)
price_min = request.GET.get('price_min', None)
price_max = request.GET.get('price_max', None)
order = request.GET.get('order', '')
if q is None:
return Response({"errors": {"details": "No query string to parse"}})
try:
# we collect our results here
result_set = set()
# values for response
prices = {
'min': None,
'max': None,
}
if q == '':
# filter entire queryset
products_qs = Product.objects.filter(active=True)
if tags:
products_qs = Product.objects.filter(tags=tags)
if category:
products_qs = Product.objects.filter(category=category)
# serialize and list data
serializer = ProductSerializer(products_qs, many=True)
result_list = [dict(i) for i in serializer.data]
else:
# split query string into single words
chunks = q.split(' ')
for chunk in chunks:
product_set, min_price, max_price = find_related_products_v6(chunk, shipping_cost, discount, category, tags, price_min, price_max)
# update price values
if product_set:
if prices['min'] is None or min_price['price__min'] < prices['min']:
prices['min'] = min_price['price__min']
if prices['max'] is None or max_price['price__max'] > prices['max']:
prices['max'] = max_price['price__max']
# add to result set
result_set.update(product_set)
# serialize and list data
serializer = SearchResultSerializer(product_set, many=True)
result_list = [dict(i) for i in serializer.data]
# extract filters from result_set
filters = extract_search_filters(result_set)
# order the results
if order == 'newest':
# order results by created
result_list = sorted(result_list, key= lambda x:x['created'], reverse=True)
elif order == 'oldest':
# order results by created
result_list = sorted(result_list, key= lambda x:x['created'], reverse=False)
elif q != '':
# order results by RANK
result_list = sorted(result_list, key= lambda x:x['rank'], reverse=True)
total_results = len(result_list)
# RESULTS PAGINATION
if limit is not None and offset is not None:
limit = int(limit)
offset = int(offset)
result_list = result_list[offset:(limit+offset)]
elif limit is not None:
limit = int(limit)
result_list = result_list[:limit]
return Response(data={"filters": filters, "count": total_results, "products": result_list, 'prices': prices})
except Exception as e:
return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class CategoryTagAutocomplete(autocomplete.Select2QuerySetView):
def get_queryset(self):
# Get the tag model dynamically
Tag = Product.category.tag_model
# Return nothing if not auth
if self.request.user.is_anonymous:
return Tag.objects.none()
qs = Tag.objects.all()
if self.q:
qs = qs.filter(name__icontains=self.q)
return qs # [x.label for x in qs]
@api_view(['POST'])
def purchase_email(request):
"""Notify coop manager and user about item purchase
"""
data = json.loads(request.body)
# check data
try:
for param in ('email', 'telephone', 'company', 'product', 'comment'):
assert(param in data.keys())
except:
return Response({"error": "Required parameters for anonymous user: email, telephone"}, status=status.HTTP_406_NOT_ACCEPTABLE)
if request.user.is_anonymous:
email = data.get('email')
else:
email = request.user.email
telephone = data.get('telephone')
# get company
company = Company.objects.filter(id=data['company']).first()
if not company:
return Response({"error": "Invalid value for company"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get company manager
manager = User.objects.filter(company=company).first()
if not manager and manager.role != 'COOP_MANAGER':
return Response({"error": "Company has no managing user"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get product
product = Product.objects.filter(id=data['product']).first()
if not product:
return Response({"error": "Invalid value for product"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# send email to company manager
manager_message = render_to_string('purchase_notification.html', {
'company': company,
'user': request.user,
'product': product,
'telephone': data['telephone'],
})
subject = "Contacto de usuario sobre venta"
email = EmailMessage(subject, manager_message, to=[manager.email])
email.send()
logging.info(f"Email sent to {manager.email} as manager of {company}")
# send confirmation email to user
user_message = render_to_string('purchase_contact_confirmation.html', {
'company': company,
'product': product,
})
subject = 'Confirmación de contacto con vendedor'
email = EmailMessage(subject, message, to=[request.user.email])
email.send()
logging.info(f"Purchase Contact confirmation email sent to {request.user.email}")
# create statslog instance to register interaction
stats_data = {
'action_object': instance,
'user': None,
'anonymous': True,
'ip_address': client_ip,
'geo': g.geos(client_ip),
'contact': True,
'shop': instance.shop,
}
StatsLog.objects.create(**stats_data)
# response
return Response()