Files
consumocuidado-server/core/views.py

313 lines
11 KiB
Python

import csv
import logging
import json
import io
import datetime
import calendar
from dateutil.relativedelta import relativedelta
from django.shortcuts import render
from django.http import HttpResponse
from django.contrib.auth import get_user_model
from django.utils.http import urlsafe_base64_decode
from django.utils.encoding import force_text
from django.db import IntegrityError
from django.contrib.gis.geos import Point, GEOSGeometry
from django.shortcuts import redirect
from django.conf import settings
from django.template.loader import render_to_string
from django.core.mail import EmailMessage
from rest_framework import status
from rest_framework import viewsets
from rest_framework.response import Response
from rest_framework.permissions import IsAdminUser, IsAuthenticated, AllowAny
from rest_framework.generics import UpdateAPIView
from rest_framework.decorators import api_view, permission_classes
from companies.models import Company
from companies.serializers import CompanySerializer
from products.models import Product
from geo.models import City, Region
from stats.models import StatsLog
from . import models
from . import serializers as core_serializers
from . import utils
from back_latienda.permissions import CustomUserPermissions, YourOwnUserPermissions
User = get_user_model()
# Create your views here.
logging.basicConfig(
filename='logs/csv-load.log',
filemode='w',
format='%(levelname)s:%(message)s',
level=logging.INFO,
)
class CustomUserViewSet(viewsets.ModelViewSet):
model = User
model_name = 'custom_user'
queryset = User.objects.all()
permission_classes = [CustomUserPermissions,]
read_serializer_class = core_serializers.CustomUserReadSerializer
write_serializer_class = core_serializers.CustomUserWriteSerializer
def get_serializer_class(self):
if self.action=='create':
return core_serializers.CustomUserWriteSerializer
elif self.action == 'update' and self.request.user.is_staff is False:
return core_serializers.UpdateUserSerializer
elif self.request.user.is_staff is True:
return core_serializers.CustomUserAdminSerializer
return core_serializers.CustomUserSerializer
def get_permissions(self):
if self.action in ['retrieve', 'update', 'partial_update', 'destroy'] and self.request.user.is_anonymous is False:
return [YourOwnUserPermissions(), ]
return super(CustomUserViewSet, self).get_permissions()
def create(self, request):
"""
Create Instance
"""
try:
serializer_class = self.get_serializer_class()
serializer = serializer_class(
data=request.data,
)
if serializer.is_valid():
# save model instance data
password = serializer.validated_data.pop('password')
instance = self.model.objects.create_user(**serializer.validated_data)
instance.set_password(password)
instance.save()
# send verification email
utils.send_verification_email(request, instance)
return Response(self.read_serializer_class(
instance, many=False, context={'request': request}).data,
status=status.HTTP_201_CREATED)
else:
return Response(
serializer.errors, status=status.HTTP_406_NOT_ACCEPTABLE)
except Exception as e:
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class ChangeUserPasswordView(UpdateAPIView):
model = models.CustomUser
queryset = model.objects.all()
permission_classes = (YourOwnUserPermissions,)
serializer_class = core_serializers.ChangePasswordSerializer
@api_view(['POST',])
@permission_classes([AllowAny])
def create_company_user(request):
"""
Create non-validated company and associated managing user
"""
if 'user' not in request.data:
return Response({"error": "Missing parameter: user"}, status=406)
if 'company' not in request.data:
return Response({"error": "Missing parameter: company"}, status=406)
# create company
company_data = request.data['company']
company_serializer = CompanySerializer(
data=company_data,
)
if company_serializer.is_valid():
# save model instance data
new_company = Company.objects.create(**company_serializer.validated_data)
else:
return Response({"error": company_serializer.errors}, status=406)
# create user
user_data = request.data['user']
user_data['role'] = 'COOP_MANAGER'
user_serializer = core_serializers.CustomUserWriteSerializer(
data=user_data,
)
if user_serializer.is_valid():
# save model instance data
password = user_serializer.validated_data.pop('password')
new_user = User.objects.create_user(**user_serializer.validated_data)
new_user.set_password(password)
new_user.company = new_company
new_user.save()
# send verification email
utils.send_verification_email(request, new_user)
else:
return Response({"error": user_serializer.errors}, status=406)
return Response(status=status.HTTP_201_CREATED)
@api_view(['GET',])
@permission_classes([IsAuthenticated,])
def my_user(request):
try:
instance = User.objects.get(email=request.user.email)
user_serializer = core_serializers.CustomUserReadSerializer(instance)
return Response(data=user_serializer.data)
except Exception as e:
return Response({'error': {str(type(e))}}, status=500)
@api_view(['POST',])
@permission_classes([IsAdminUser,])
def load_coop_managers(request):
"""Read CSV file being received
Parse it to create users and related companies
"""
try:
csv_file = request.FILES['csv_file']
if csv_file.name.endswith('.csv') is not True:
logging.error(f"File {csv_file.name} is not a CSV file")
return Response({"errors":{"details": "File is not CSV type"}})
logging.info(f"Reading contents of {csv_file.name}")
decoded_file = csv_file.read().decode('utf-8').splitlines()
csv_reader = csv.DictReader(decoded_file, delimiter=',')
coop_count, user_count = utils.coop_loader(csv_reader, request)
return Response({'details': f"Created {coop_count} Companies, {user_count} Managing Users"})
except Exception as e:
return Response({"errors": {"details": f'{type(e)}: {e}'}})
@api_view(['GET',])
@permission_classes([AllowAny,])
def activate_user(request, uidb64, token):
try:
uid = force_text(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
user = None
if user is not None and utils.account_activation_token.check_token(user, token):
# activate user
user.is_active = True
user.email_verified = True
user.save()
if settings.ACTIVATION_REDIRECT:
return redirect(settings.ACTIVATION_REDIRECT)
return Response(f"Tu cuenta de usuario {user.email} ha sido activada")
else:
return Response({"error": f"Tu token de verificacion no coincide con ningún usuario registrado"}, status=status.HTTP_406_NOT_ACCEPTABLE)
@api_view(['POST'])
@permission_classes([AllowAny,])
def forgotten_password(request):
"""Set new password for registered user and send email
"""
if 'email' not in request.data:
return Response({"error": "Missing parameter: email"}, status=400)
email = request.data['email']
user = User.objects.get(email=email)
if user:
try:
password = utils.generate_password(12)
user.set_password(password)
user.save()
except:
return Response({'error': f"Could not set new password [{str(type(e))}]: {str(e)}"}, status=500)
try:
message = render_to_string('forgotten_password.html', {
'user': user,
'password': password,
})
subject = "[latienda.coop] Contraseña restablecida"
email = EmailMessage(subject, message, to=[email])
email.content_subtype = "html"
email.send()
logging.info(f"Email sent to {email}")
except Exception as e:
return Response({'error': f"Could not send emails [{str(type(e))}]: {str(e)}"}, status=500)
else:
return Response({'error': f"This email has no user related to it"}, status=404)
return Response()
@api_view(['GET',])
@permission_classes([IsAdminUser,])
def admin_stats(request):
company_count = Company.objects.count()
product_count = Product.objects.count()
companies_per_region = {}
products_per_region = {}
for region in Region.objects.all():
count = Company.objects.filter(geo__within=region.geo).count()
companies_per_region[region.name] = count
count = Product.objects.filter(company__geo__within=region.geo).count()
products_per_region[region.name] = count
today = datetime.date.today()
# companies timeline: count companies at increments of 4 weeks
companies_timeline = []
# products timeline: count products at increments of 4 weeks
products_timeline = []
# users timeline: count users at increments of 4 weeks
users_timeline = []
# contact timeline: count statlogs from contact at increments of 4 weeks
contact_timeline = []
# shopping timeline: count statlogs from shopping at increments of 4 weeks
shopping_timeline = []
for i in reversed(range(12)):
before = datetime.date(today.year, today.month, 1, ) - relativedelta(months=i)
after = datetime.date(today.year, today.month, 1) - relativedelta(months=i-1)
# companies
companies_timeline.append({
'month': before.month,
'value': Company.objects.filter(created__range=[before,after]).count()
})
# products
products_timeline.append({
'month': before.month,
'value': Product.objects.filter(created__range=[before,after]).count()
})
# users
users_timeline.append({
'month': before.month,
'value': User.objects.filter(created__range=[before,after]).count()
})
# contact
contact_timeline.append({
'month': before.month,
'value': StatsLog.objects.filter(contact=True, created__range=[before,after]).count()
})
# shopping
shopping_timeline.append({
'month': before.month,
'value': StatsLog.objects.filter(shop=True, created__range=[before,after]).count()
})
data = {
'company_count': company_count,
'product_count': product_count,
'companies_per_region': companies_per_region,
'products_per_region': products_per_region,
'companies_timeline': companies_timeline,
'products_timeline': products_timeline,
'users_timeline': users_timeline,
'contact_timeline': contact_timeline,
'shopping_timeline': shopping_timeline,
}
return Response(data=data)