import logging import csv import datetime import json from django.db.models import Q from django.core import serializers from django.core.validators import validate_email from django.contrib.auth import get_user_model from django.template.loader import render_to_string from django.core.mail import EmailMessage # Create your views here. from rest_framework import status from rest_framework import viewsets from rest_framework.response import Response from rest_framework.permissions import IsAuthenticatedOrReadOnly, IsAdminUser, IsAuthenticated, AllowAny from rest_framework.decorators import api_view, permission_classes, action from rest_framework.filters import OrderingFilter from django_filters.rest_framework import DjangoFilterBackend import requests from history.models import HistorySync from dal import autocomplete from products.models import Product, CategoryTag from products.serializers import ProductSerializer, TagFilterSerializer, SearchResultSerializer from companies.models import Company from stats.models import StatsLog from back_latienda.permissions import IsCreator, IsSiteAdmin, ReadOnly from .utils import extract_search_filters, ranked_product_search, product_loader, get_related_products from utils.tag_serializers import TaggitSerializer from utils.tag_filters import ProductTagFilter, ProductOrderFilter User = get_user_model() logging.basicConfig( filename='logs/product-load.log', filemode='w', format='%(levelname)s:%(message)s', level=logging.INFO, ) class ProductViewSet(viewsets.ModelViewSet): queryset = Product.objects.filter(active=True).order_by('-created') serializer_class = ProductSerializer permission_classes = [IsAuthenticatedOrReadOnly, IsCreator] filter_backends = [DjangoFilterBackend, OrderingFilter] filterset_class = ProductTagFilter def perform_create(self, serializer): serializer.save(creator=self.request.user, company=self.request.user.company) @action(detail=True, methods=['GET',]) def related(self, request, pk=None): """Find instances similar to the one referenced """ # TODO: find the most similar products product = self.get_object() qs = get_related_products(product) serializer = self.serializer_class(qs, many=True) return Response(data=serializer.data) class MyProductsViewSet(viewsets.ModelViewSet): """ Allows user to get all products where user is product creator """ model = Product serializer_class = ProductSerializer permission_classes = [IsAuthenticated] def get_queryset(self): return self.model.objects.filter(company=self.request.user.company).order_by('-created') class AdminProductsViewSet(viewsets.ModelViewSet): """ Allows user with role 'SITE_ADMIN' to access all product instances """ queryset = Product.objects.all() serializer_class = ProductSerializer permission_classes = [IsSiteAdmin] def perform_create(self, serializer): serializer.save(creator=self.request.user) @api_view(['POST',]) @permission_classes([IsAuthenticated,]) def load_coop_products(request): """Read CSV file being received Parse it to create products for related Company Authenticated user must have a related company """ # check company linked to user if request.user.company is None: return Response({"errors":{"details": "Your user has no company to add products to"}}, status=status.HTTP_406_NOT_ACCEPTABLE) try: csv_file = request.FILES['csv_file'] if csv_file.name.endswith('.csv') is not True: logging.error(f"File {csv_file.name} is not a CSV file") return Response({"errors":{"details": "File is not CSV type"}}, status=status.HTTP_406_NOT_ACCEPTABLE) logging.info(f"Reading contents of {csv_file.name}") decoded_file = csv_file.read().decode('utf-8').splitlines() csv_reader = csv.DictReader(decoded_file, delimiter=',') count = product_loader(csv_reader, request.user) if count is None: return Response({"errors": {"details": "Authenticated user is not related to any company"}}, status=status.HTTP_406_NOT_ACCEPTABLE) return Response(f"{count} products registered for {request.user.company.company_name}") except Exception as e: return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['GET',]) # include allowed methods def product_search(request): """ Takes a string of data, return relevant products Params: - q: used for search [MANDATORY] - limit: max number of returned instances [OPTIONAL] - offset: where to start counting results [OPTIONAL] - shipping_cost: true/false - discount: true/false - category: string - tags: string - order: string (newest/oldest) - price_min: int - price_max: int In the response: - filters - count - products - price_min - price_max """ # capture query params q = request.GET.get('q', None) limit = request.GET.get('limit', None) offset = request.GET.get('offset', None) shipping_cost = request.GET.get('shipping_cost', None) if shipping_cost is not None: if shipping_cost == 'true': shipping_cost = True elif shipping_cost == 'false': shipping_cost = False else: shipping_cost = None discount = request.GET.get('discount', None) if discount is not None: if discount == 'true': discount = True elif discount == 'false': discount = False else: discount = None # category = request.GET.get('category', None) category = request.query_params.getlist('category[]') or None tags = request.GET.get('tags', None) price_min = request.GET.get('price_min', None) price_max = request.GET.get('price_max', None) order = request.GET.get('order', '') if q is None: return Response({"errors": {"details": "No query string to parse"}}) try: # we collect our results here result_set = set() # values for response prices = { 'min': None, 'max': None, } if q == '': # filter entire queryset products_qs = Product.objects.filter(active=True) if tags: products_qs = Product.objects.filter(tags=tags) if category: products_qs = Product.objects.filter(category__name__in=category) # serialize and list data serializer = ProductSerializer(products_qs, many=True) result_list = [dict(i) for i in serializer.data] else: # split query string into single words chunks = q.split(' ') for chunk in chunks: product_set, min_price, max_price = ranked_product_search(chunk, shipping_cost, discount, category, tags, price_min, price_max) # update price values if product_set: if prices['min'] is None or min_price['price__min'] < prices['min']: prices['min'] = min_price['price__min'] if prices['max'] is None or max_price['price__max'] > prices['max']: prices['max'] = max_price['price__max'] # add to result set result_set.update(product_set) # serialize and list data serializer = SearchResultSerializer(product_set, many=True) result_list = [dict(i) for i in serializer.data] # extract filters from result_set filters = extract_search_filters(result_set) # order the results if order == 'newest': # order results by created result_list = sorted(result_list, key= lambda x:x['created'], reverse=True) elif order == 'oldest': # order results by created result_list = sorted(result_list, key= lambda x:x['created'], reverse=False) elif q != '': # order results by RANK result_list = sorted(result_list, key= lambda x:x['rank'], reverse=True) total_results = len(result_list) # RESULTS PAGINATION if limit is not None and offset is not None: limit = int(limit) offset = int(offset) result_list = result_list[offset:(limit+offset)] elif limit is not None: limit = int(limit) result_list = result_list[:limit] return Response(data={"filters": filters, "count": total_results, "products": result_list, 'prices': prices}) except Exception as e: return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class CategoryTagAutocomplete(autocomplete.Select2QuerySetView): def get_queryset(self): # Get the tag model dynamically Tag = Product.category.tag_model # Return nothing if not auth if self.request.user.is_anonymous: return Tag.objects.none() qs = Tag.objects.all() if self.q: qs = qs.filter(name__icontains=self.q) return qs # [x.label for x in qs] @api_view(['POST']) @permission_classes([AllowAny,]) def purchase_email(request): """Notify coop manager and user about item purchase """ data = json.loads(request.body) # check data if request.user.is_anonymous and 'email' not in data: return Response({"error": "Anonymous users must include an email parameter value"}, status=status.HTTP_406_NOT_ACCEPTABLE) try: for param in ('telephone', 'company', 'product', 'comment'): assert(param in data.keys()) except: return Response({"error": "Required parameters for anonymous user: telephone, company, product, comment"}, status=status.HTTP_406_NOT_ACCEPTABLE) if request.user.is_anonymous: user_email = data.get('email') else: user_email = request.user.email telephone = data.get('telephone') # validate email try: validate_email(user_email) except: return Response({"error": "Value for email is not valid"}, status=status.HTTP_406_NOT_ACCEPTABLE) # get comment comment = data.get('comment') # get company company = Company.objects.filter(id=data['company']).first() if not company: return Response({"error": "Invalid value for company"}, status=status.HTTP_406_NOT_ACCEPTABLE) # get managing user managing_user = User.objects.filter(company=company, role='COOP_MANAGER').first() if not managing_user: return Response({"error": "No managing user found for company"}, status=status.HTTP_406_NOT_ACCEPTABLE) # get product product = Product.objects.filter(id=data['product'], company=company).first() if not product: return Response({"error": "Invalid value for product"}, status=status.HTTP_406_NOT_ACCEPTABLE) # check company.email if company.email is None: company.email = managing_user.email try: # send email to company company_message = render_to_string('purchase_notification_v2.html', { 'company': company, 'user': request.user, 'product': product, 'telephone': data['telephone'], 'comment': comment, }) subject = "[latienda.coop] Solicitud de compra" email = EmailMessage(subject, company_message, to=[company.email]) email.content_subtype = "html" email.send() logging.info(f"Email sent to {company}") # send confirmation email to user user_message = render_to_string('purchase_contact_confirmation.html', { 'company': company, 'product': product, 'company_message': company_message, 'manager': managing_user, }) subject = 'Confirmación de petición de compra' email = EmailMessage(subject, user_message, to=[user_email]) email.content_subtype = "html" email.send() logging.info(f"Purchase Contact confirmation email sent to {user_email}") except Exception as e: return Response({'error': f"Could not send emails [{str(type(e))}]: {str(e)}"}, status=500) # create statslog instance to register interaction stats_data = { 'action_object': product, 'user': request.user if request.user.is_authenticated else None, 'anonymous': request.user.is_anonymous, 'contact': True, 'shop': company.shop, } StatsLog.objects.create(**stats_data) # response return Response()