Files
2025-10-10 12:57:44 +00:00

515 lines
19 KiB
Python

import logging
import csv
import datetime
import json
import random
from django.db.models import Q
from django.core import serializers
from django.core.validators import validate_email
from django.contrib.auth import get_user_model
from django.contrib.gis.geos import Point
from django.template.loader import render_to_string
from django.core.mail import EmailMessage
from django.db.models import Max, Min
# Create your views here.
from rest_framework import status
from rest_framework import viewsets
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticatedOrReadOnly, IsAdminUser, IsAuthenticated, AllowAny
from rest_framework.decorators import api_view, permission_classes, action
from rest_framework.filters import OrderingFilter
from django_filters.rest_framework import DjangoFilterBackend
import requests
from companies.serializers import CompanySerializer
from history.models import HistorySync
from dal import autocomplete
from products.models import Product, CategoryTag
from products.serializers import ProductSerializer, TagFilterSerializer, SearchResultSerializer
from companies.models import Company
from stats.models import StatsLog
from back_latienda.permissions import IsProductOwner, IsSiteAdmin, ReadOnly
from .utils import extract_search_filters, ranked_product_search, product_loader, get_related_products
from utils.woocommerce import migrate_shop_products
from utils.tag_serializers import TaggitSerializer
from utils.tag_filters import ProductTagFilter, ProductOrderFilter
User = get_user_model()
logging.basicConfig(
filename='logs/product-load.log',
filemode='w',
format='%(levelname)s:%(message)s',
level=logging.INFO,
)
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.filter(active=True).order_by('-created')
serializer_class = ProductSerializer
permission_classes = [IsAuthenticatedOrReadOnly, IsProductOwner]
filter_backends = [DjangoFilterBackend, OrderingFilter]
filterset_class = ProductTagFilter
def perform_create(self, serializer):
serializer.save(creator=self.request.user, company=self.request.user.company)
@action(detail=True, methods=['GET',])
def related(self, request, pk=None):
"""Find instances similar to the one referenced
"""
# TODO: find the most similar products
product = self.get_object()
qs = get_related_products(product)
serializer = self.serializer_class(qs, many=True, context={'request': request})
return Response(data=serializer.data)
class MyProductsViewSet(viewsets.ModelViewSet):
""" Allows user to get all products where user is product creator
"""
model = Product
serializer_class = ProductSerializer
permission_classes = [IsAuthenticated, IsProductOwner]
def get_queryset(self):
return self.model.objects.filter(company=self.request.user.company).order_by('-created')
class AdminProductsViewSet(viewsets.ModelViewSet):
""" Allows user with role 'SITE_ADMIN' to access all product instances
"""
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [IsSiteAdmin]
def perform_create(self, serializer):
serializer.save(creator=self.request.user)
def get_decent_products():
return Product.objects.filter(
Q(active=True)
).exclude(
Q(image__isnull=True) | Q(image='')
)
def get_random_sample_list_of_decent_products_in_category(category_name, sample_size):
random_ids = []
cats = get_category_and_descendants(category_name)
products = get_decent_products()
ids = list(products.filter(Q(category__in=cats)).values_list('id', flat=True))
size = len(ids) if len(ids) < sample_size else sample_size
random_ids = random.sample(ids, size)
products_qs = products.filter(id__in=random_ids)
result = ProductSerializer(products_qs, many=True).data
return result
def get_latest_products(number):
products_qs = get_decent_products().order_by('-created')[:number]
result = ProductSerializer(products_qs, many=True).data
return result
def get_latest_companies(number):
companies_qs = Company.objects.filter(is_validated=True).order_by('-created')[:number]
result = CompanySerializer(companies_qs, many=True).data
return result
def get_popular_categories(number):
categories_list = list(CategoryTag.objects.filter(level=1, official=True).values('name'))
counted_categories = []
for cat in categories_list:
count = 0
categories = get_category_and_descendants(cat['name'])
for i in categories:
count += i.count
counted_categories.append({
"name": categories[0].name,
"image": categories[0].image.url if categories[0].image else '',
"count": count
})
popular = sorted(counted_categories, key=lambda d: d['count'], reverse=True)[:number]
return popular
@api_view(['GET',]) # include allowed methods
def initial(request):
"""
Set landing page items
"""
category_cards = dict()
cards_size = 4
categories = ['Libros', 'Cosméticos', 'Plantas']
for c in categories:
category_cards[c] = get_random_sample_list_of_decent_products_in_category(c, cards_size)
products = get_latest_products(12)
companies = get_latest_companies(12)
categories = get_popular_categories(12)
return Response(data={
"cards": category_cards,
"products": products,
"companies": companies,
"categories": categories
})
@api_view(['POST',])
@permission_classes([IsAuthenticated,])
def load_coop_products(request):
"""Read CSV file being received
Parse it to create products for related Company
Authenticated user must have a related company
"""
# check company linked to user
if request.user.company is None:
return Response({"errors":{"details": "Your user has no company to add products to"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
try:
csv_file = request.FILES['csv_file']
expected_fieldnames = ['sku','nombre-producto','descripcion','imagen','url','precio','gastos-envio','cond-envio','descuento','stock','tags','categoria','identificadores']
if csv_file.name.endswith('.csv') is not True:
logging.error(f"File {csv_file.name} is not a CSV file")
return Response({"errors":{"details": "File is not CSV type"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
logging.info(f"Reading contents of {csv_file.name}")
decoded_file = csv_file.read().decode('utf-8').splitlines()
fieldnames = decoded_file[0].split(',')
# Check the fieldnames are as specified, if requested
if expected_fieldnames and fieldnames != expected_fieldnames:
return Response({"errors": {"details": f"The CSV fields are expected to be {','.join(expected_fieldnames)}"}}, status=status.HTTP_400_BAD_REQUEST)
csv_reader = csv.DictReader(decoded_file, delimiter=',')
count = product_loader(csv_reader, request.user)
if count is None:
return Response({"errors": {"details": "Authenticated user is not related to any company"}}, status=status.HTTP_406_NOT_ACCEPTABLE)
return Response(f"{count} products registered for {request.user.company.company_name}")
except Exception as e:
return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def get_category_and_descendants(category_name):
categories=[]
cat = CategoryTag.objects.filter(label__iexact=category_name).first()
# append category tag, and children
categories.append(cat)
categories.extend(cat.get_descendants())
return categories
@api_view(['GET',]) # include allowed methods
def product_search(request):
"""
Takes a string of data, return relevant products
Params:
- q: used for search [MANDATORY]
- limit: max number of returned instances [OPTIONAL]
- offset: where to start counting results [OPTIONAL]
- shipping_cost: true/false
- discount: true/false
- category: string
- tags: string
- order: string (newest/oldest)
- price_min: int
- price_max: int
In the response:
- filters
- count
- products
- price_min
- price_max
"""
# capture query params
q = request.GET.get('q', None)
limit = request.GET.get('limit', None)
offset = request.GET.get('offset', None)
shipping_cost = request.GET.get('shipping_cost', None)
if shipping_cost is not None:
if shipping_cost == 'true':
shipping_cost = True
elif shipping_cost == 'false':
shipping_cost = False
else:
shipping_cost = None
discount = request.GET.get('discount', None)
if discount is not None:
if discount == 'true':
discount = True
elif discount == 'false':
discount = False
else:
discount = None
# category = request.GET.get('category', None)
categories = request.query_params.getlist('category') or None
tags = request.GET.get('tags', None)
price_min = request.GET.get('price_min', None)
price_max = request.GET.get('price_max', None)
order = request.GET.get('order', '')
latitude = request.GET.get('latitude', None)
longitude = request.GET.get('longitude', None)
# try:
# we collect our results here
result_set = set()
# values for response
prices = {
'min': None,
'max': None,
}
if not q:
# filter entire queryset
products_qs = Product.objects.filter(active=True)
# filter by category
if categories is not None:
descendants = []
for entry in categories:
descendants.extend(get_category_and_descendants(entry))
products_qs = products_qs.filter(category__in=descendants)
# filter by tags
if tags is not None:
products_qs = products_qs.filter(tags__name__icontains=tags)
# filter by shipping cost
if shipping_cost is True:
# only instances with shipping costs
products_qs = products_qs.filter(
Q(shipping_cost__isnull=False)&
Q(shipping_cost__gte=1)
)
elif shipping_cost is False:
# only intances without shpping costs
products_qs = products_qs.filter(Q(shipping_cost=None)|Q(shipping_cost=0.00))
# filter by discount
if discount is True:
# only instances with shipping costs
products_qs = products_qs.filter(
Q(discount__isnull=False)&
Q(discount__gte=1)
)
elif discount is False:
# only intances without shpping costs
products_qs = products_qs.filter(Q(discount=None)|Q(discount=0.00))
if latitude is not None and longitude is not None:
coordinates = (float(longitude), float(latitude))
# (n km / 40,000 km * 360 degrees) = radius length degrees
# Radiuses: 10km, 50km, 200km
radiuses = [0.09, 0.45, 1.8]
for radius in reversed(radiuses):
products_geo_filtered = products_qs.filter(company__geo__dwithin=(Point(coordinates), radius))
if len(products_geo_filtered) >= 10:
products_qs = products_geo_filtered
# filter by price
if price_min is not None:
products_qs = products_qs.filter(price__gte=price_min)
if price_max is not None:
products_qs = products_qs.filter(price__lte=price_max)
# get min_price and max_price
prices['min'] = products_qs.aggregate(Min('price'))['price__min']
prices['max'] = products_qs.aggregate(Max('price'))['price__max']
# serialize and list data
serializer = ProductSerializer(products_qs, many=True, context={'request': request})
result_list = [dict(i) for i in serializer.data]
else:
# split query string into single words
chunks = q.split(' ')
for chunk in chunks:
product_set, min_price, max_price = ranked_product_search(chunk, shipping_cost, discount, categories, tags, price_min, price_max)
# update price values
if product_set:
if prices['min'] is None or min_price['price__min'] < prices['min']:
prices['min'] = min_price['price__min']
if prices['max'] is None or max_price['price__max'] > prices['max']:
prices['max'] = max_price['price__max']
# add to result set
result_set.update(product_set)
# serialize and list data
serializer = SearchResultSerializer(product_set, many=True, context={'request': request})
result_list = [dict(i) for i in serializer.data]
# extract filters from result_set
filters = extract_search_filters(result_set)
# order the results
if order == 'newest':
# order results by created
result_list = sorted(result_list, key= lambda x:x['created'], reverse=True)
elif order == 'oldest':
# order results by created
result_list = sorted(result_list, key= lambda x:x['created'], reverse=False)
elif q:
# order results by RANK
result_list = sorted(result_list, key= lambda x:x['rank'], reverse=True)
total_results = len(result_list)
# RESULTS PAGINATION
if limit is not None and offset is not None:
limit = int(limit)
offset = int(offset)
result_list = result_list[offset:(limit+offset)]
elif limit is not None:
limit = int(limit)
result_list = result_list[:limit]
return Response(data={"filters": filters, "count": total_results, "products": result_list, 'prices': prices})
# except Exception as e:
# return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class CategoryTagAutocomplete(autocomplete.Select2QuerySetView):
def get_queryset(self):
# Get the tag model dynamically
Tag = Product.category.tag_model
# Return nothing if not auth
if self.request.user.is_anonymous:
return Tag.objects.none()
qs = Tag.objects.all()
if self.q:
qs = qs.filter(name__icontains=self.q)
return qs # [x.label for x in qs]
@api_view(['POST'])
@permission_classes([AllowAny,])
def purchase_email(request):
"""Notify coop manager and user about item purchase
"""
data = request.data
# check data
if request.user.is_anonymous and 'email' not in data:
return Response({"error": "Anonymous users must include an email parameter value"}, status=status.HTTP_406_NOT_ACCEPTABLE)
try:
for param in ('telephone', 'company', 'product', 'comment'):
assert(param in data.keys())
except:
return Response({"error": "Required parameters for anonymous user: telephone, company, product, comment"}, status=status.HTTP_406_NOT_ACCEPTABLE)
if request.user.is_anonymous:
user_email = data.get('email')
else:
user_email = request.user.email
telephone = data.get('telephone')
# validate email
try:
validate_email(user_email)
except:
return Response({"error": "Value for email is not valid"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get comment
comment = data.get('comment')
# get company
company = Company.objects.filter(id=data['company']).first()
if not company:
return Response({"error": "Invalid value for company"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get managing user
managing_user = User.objects.filter(company=company, role='COOP_MANAGER').first()
if not managing_user:
return Response({"error": "No managing user found for company"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# get product
product = Product.objects.filter(id=data['product'], company=company).first()
if not product:
return Response({"error": "Invalid value for product"}, status=status.HTTP_406_NOT_ACCEPTABLE)
# check company.email
if company.email is None:
company.email = managing_user.email
try:
# send email to company
company_message = render_to_string('purchase_notification_v2.html', {
'company': company,
'user': request.user,
'product': product,
'telephone': data['telephone'],
'comment': comment,
})
subject = "[latienda.coop] Solicitud de compra"
email = EmailMessage(subject, company_message, to=[company.email])
email.content_subtype = "html"
email.send()
logging.info(f"Email sent to {company}")
# send confirmation email to user
user_message = render_to_string('purchase_contact_confirmation_v2.html', {
'company': company,
'product': product,
'company_message': company_message,
'manager': managing_user,
})
subject = 'Confirmación de petición de compra'
email = EmailMessage(subject, user_message, to=[user_email])
email.content_subtype = "html"
email.send()
logging.info(f"Purchase Contact confirmation email sent to {user_email}")
except Exception as e:
return Response({'error': f"Could not send emails [{str(type(e))}]: {str(e)}"}, status=500)
# create statslog instance to register interaction
stats_data = {
'action_object': product,
'user': request.user if request.user.is_authenticated else None,
'anonymous': request.user.is_anonymous,
'contact': True,
'shop': company.shop,
}
StatsLog.objects.create(**stats_data)
# response
return Response()
@api_view(['GET'])
@permission_classes([AllowAny,])
def all_categories(request):
all_categories = []
for instance in CategoryTag.objects.all():
all_categories.append(instance.name)
return Response(data=all_categories)
@api_view(['POST'])
@permission_classes([IsAuthenticated,])
def sync_shop(request):
print(request.data)
url = request.data.get('url')
key = request.data.get('key')
secret = request.data.get('secret')
print(f"Starting migration...")
response = migrate_shop_products(url, key, secret, request.user )
print(f"Products created: {len(response['new_products'])}")
print(response["error"])
if response["error"]:
message = render_to_string('sync_failed.html', {})
# send email to company
subject = "Estado de la sincronización"
email = EmailMessage(subject, message, to=[request.user.email])
email.content_subtype = "html"
email.send()
else:
message = render_to_string('sync_success.html', {
'new_products': len(response['new_products'])
})
# send email to company
subject = "Estado de la sincronización"
email = EmailMessage(subject, message, to=[request.user.email])
email.content_subtype = "html"
email.send()
return Response()