224 lines
7.8 KiB
Python
224 lines
7.8 KiB
Python
import csv
|
|
import logging
|
|
import json
|
|
import io
|
|
import datetime
|
|
|
|
from django.shortcuts import render
|
|
from django.http import HttpResponse
|
|
from django.contrib.auth import get_user_model
|
|
from django.utils.http import urlsafe_base64_decode
|
|
from django.utils.encoding import force_text
|
|
from django.db import IntegrityError
|
|
from django.contrib.gis.geos import Point, GEOSGeometry
|
|
from django.shortcuts import redirect
|
|
from django.conf import settings
|
|
|
|
from rest_framework import status
|
|
from rest_framework import viewsets
|
|
from rest_framework.response import Response
|
|
from rest_framework.permissions import IsAdminUser, IsAuthenticated, AllowAny
|
|
from rest_framework.generics import UpdateAPIView
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
|
|
from companies.models import Company
|
|
from companies.serializers import CompanySerializer
|
|
from products.models import Product
|
|
from geo.models import City, Region
|
|
|
|
from . import models
|
|
from . import serializers as core_serializers
|
|
from . import utils
|
|
|
|
from back_latienda.permissions import CustomUserPermissions, YourOwnUserPermissions
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
# Create your views here.
|
|
|
|
logging.basicConfig(
|
|
filename='logs/csv-load.log',
|
|
filemode='w',
|
|
format='%(levelname)s:%(message)s',
|
|
level=logging.INFO,
|
|
)
|
|
|
|
|
|
class CustomUserViewSet(viewsets.ModelViewSet):
|
|
|
|
model = User
|
|
model_name = 'custom_user'
|
|
queryset = User.objects.all()
|
|
permission_classes = [CustomUserPermissions,]
|
|
read_serializer_class = core_serializers.CustomUserReadSerializer
|
|
write_serializer_class = core_serializers.CustomUserWriteSerializer
|
|
|
|
def get_serializer_class(self):
|
|
if self.action=='create':
|
|
return core_serializers.CustomUserWriteSerializer
|
|
elif self.action == 'update' and self.request.user.is_staff is False:
|
|
return core_serializers.UpdateUserSerializer
|
|
elif self.request.user.is_staff is True:
|
|
return core_serializers.CustomUserAdminSerializer
|
|
return core_serializers.CustomUserSerializer
|
|
|
|
def get_permissions(self):
|
|
if self.action in ['retrieve', 'update', 'partial_update', 'destroy'] and self.request.user.is_anonymous is False:
|
|
return [YourOwnUserPermissions(), ]
|
|
return super(CustomUserViewSet, self).get_permissions()
|
|
|
|
def create(self, request):
|
|
"""
|
|
Create Instance
|
|
"""
|
|
try:
|
|
serializer_class = self.get_serializer_class()
|
|
serializer = serializer_class(
|
|
data=request.data,
|
|
)
|
|
if serializer.is_valid():
|
|
# save model instance data
|
|
password = serializer.validated_data.pop('password')
|
|
instance = self.model.objects.create_user(**serializer.validated_data)
|
|
instance.set_password(password)
|
|
instance.save()
|
|
# send verification email
|
|
utils.send_verification_email(request, instance)
|
|
|
|
return Response(self.read_serializer_class(
|
|
instance, many=False, context={'request': request}).data,
|
|
status=status.HTTP_201_CREATED)
|
|
else:
|
|
return Response(
|
|
serializer.errors, status=status.HTTP_406_NOT_ACCEPTABLE)
|
|
except Exception as e:
|
|
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
class ChangeUserPasswordView(UpdateAPIView):
|
|
|
|
model = models.CustomUser
|
|
queryset = model.objects.all()
|
|
permission_classes = (YourOwnUserPermissions,)
|
|
serializer_class = core_serializers.ChangePasswordSerializer
|
|
|
|
|
|
@api_view(['POST',])
|
|
@permission_classes([AllowAny])
|
|
def create_company_user(request):
|
|
"""
|
|
Create non-validated company and associated managing user
|
|
"""
|
|
if 'user' not in request.data:
|
|
return Response({"error": "Missing parameter: user"}, status=406)
|
|
if 'company' not in request.data:
|
|
return Response({"error": "Missing parameter: company"}, status=406)
|
|
|
|
# create company
|
|
company_data = request.data['company']
|
|
company_serializer = CompanySerializer(
|
|
data=company_data,
|
|
)
|
|
if company_serializer.is_valid():
|
|
# save model instance data
|
|
new_company = Company.objects.create(**company_serializer.validated_data)
|
|
else:
|
|
return Response({"error": company_serializer.errors}, status=406)
|
|
|
|
# create user
|
|
user_data = request.data['user']
|
|
user_data['role'] = 'COOP_MANAGER'
|
|
user_data['company'] = new_company.id
|
|
user_serializer = core_serializers.CustomUserWriteSerializer(
|
|
data=user_data,
|
|
)
|
|
if user_serializer.is_valid():
|
|
# save model instance data
|
|
password = user_serializer.validated_data.pop('password')
|
|
new_user = User.objects.create_user(**user_serializer.validated_data)
|
|
new_user.set_password(password)
|
|
new_user.save()
|
|
# send verification email
|
|
utils.send_verification_email(request, new_user)
|
|
else:
|
|
return Response({"error": user_serializer.errors}, status=406)
|
|
|
|
return Response(status=status.HTTP_201_CREATED)
|
|
|
|
|
|
@api_view(['GET',])
|
|
@permission_classes([IsAuthenticated,])
|
|
def my_user(request):
|
|
try:
|
|
instance = User.objects.get(email=request.user.email)
|
|
user_serializer = core_serializers.CustomUserReadSerializer(instance)
|
|
return Response(data=user_serializer.data)
|
|
except Exception as e:
|
|
return Response({'error': {str(type(e))}}, status=500)
|
|
|
|
|
|
@api_view(['POST',])
|
|
@permission_classes([IsAdminUser,])
|
|
def load_coop_managers(request):
|
|
"""Read CSV file being received
|
|
Parse it to create users and related companies
|
|
"""
|
|
try:
|
|
csv_file = request.FILES['csv_file']
|
|
if csv_file.name.endswith('.csv') is not True:
|
|
logging.error(f"File {csv_file.name} is not a CSV file")
|
|
return Response({"errors":{"details": "File is not CSV type"}})
|
|
|
|
logging.info(f"Reading contents of {csv_file.name}")
|
|
decoded_file = csv_file.read().decode('utf-8').splitlines()
|
|
csv_reader = csv.DictReader(decoded_file, delimiter=',')
|
|
coop_count, user_count = utils.coop_loader(csv_reader, request)
|
|
|
|
return Response({'details': f"Created {coop_count} Companies, {user_count} Managing Users"})
|
|
except Exception as e:
|
|
return Response({"errors": {"details": f'{type(e)}: {e}'}})
|
|
|
|
|
|
@api_view(['GET',])
|
|
@permission_classes([AllowAny,])
|
|
def activate_user(request, uidb64, token):
|
|
try:
|
|
uid = force_text(urlsafe_base64_decode(uidb64))
|
|
user = User.objects.get(pk=uid)
|
|
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
|
|
user = None
|
|
|
|
if user is not None and utils.account_activation_token.check_token(user, token):
|
|
# activate user
|
|
user.is_active = True
|
|
user.save()
|
|
if settings.ACTIVATION_REDIRECT:
|
|
return redirect(settings.ACTIVATION_REDIRECT)
|
|
return Response(f"Tu cuenta de usuario {user.email} ha sido activada")
|
|
else:
|
|
return Response({"error": f"Tu token de verificacion no coincide con ningún usuario registrado"}, status=status.HTTP_406_NOT_ACCEPTABLE)
|
|
|
|
|
|
@api_view(['GET',])
|
|
@permission_classes([IsAdminUser,])
|
|
def admin_stats(request):
|
|
company_count = Company.objects.count()
|
|
product_count = Product.objects.count()
|
|
companies_per_region = {}
|
|
products_per_region = {}
|
|
|
|
for region in Region.objects.all():
|
|
count = Company.objects.filter(geo__within=region.geo).count()
|
|
companies_per_region[region.name] = count
|
|
count = Product.objects.filter(company__geo__within=region.geo).count()
|
|
products_per_region[region.name] = count
|
|
|
|
data = {
|
|
'company_count': company_count,
|
|
'product_count': product_count,
|
|
'companies_per_region': companies_per_region,
|
|
'products_per_region': products_per_region,
|
|
}
|
|
return Response(data=data)
|