import csv import logging import json import io import datetime import calendar from dateutil.relativedelta import relativedelta from django.shortcuts import render from django.http import HttpResponse from django.contrib.auth import get_user_model from django.utils.http import urlsafe_base64_decode from django.utils.encoding import force_text from django.db import IntegrityError from django.contrib.gis.geos import Point, GEOSGeometry from django.shortcuts import redirect from django.conf import settings from rest_framework import status from rest_framework import viewsets from rest_framework.response import Response from rest_framework.permissions import IsAdminUser, IsAuthenticated, AllowAny from rest_framework.generics import UpdateAPIView from rest_framework.decorators import api_view, permission_classes from companies.models import Company from companies.serializers import CompanySerializer from products.models import Product from geo.models import City, Region from stats.models import StatsLog from . import models from . import serializers as core_serializers from . import utils from back_latienda.permissions import CustomUserPermissions, YourOwnUserPermissions User = get_user_model() # Create your views here. logging.basicConfig( filename='logs/csv-load.log', filemode='w', format='%(levelname)s:%(message)s', level=logging.INFO, ) class CustomUserViewSet(viewsets.ModelViewSet): model = User model_name = 'custom_user' queryset = User.objects.all() permission_classes = [CustomUserPermissions,] read_serializer_class = core_serializers.CustomUserReadSerializer write_serializer_class = core_serializers.CustomUserWriteSerializer def get_serializer_class(self): if self.action=='create': return core_serializers.CustomUserWriteSerializer elif self.action == 'update' and self.request.user.is_staff is False: return core_serializers.UpdateUserSerializer elif self.request.user.is_staff is True: return core_serializers.CustomUserAdminSerializer return core_serializers.CustomUserSerializer def get_permissions(self): if self.action in ['retrieve', 'update', 'partial_update', 'destroy'] and self.request.user.is_anonymous is False: return [YourOwnUserPermissions(), ] return super(CustomUserViewSet, self).get_permissions() def create(self, request): """ Create Instance """ try: serializer_class = self.get_serializer_class() serializer = serializer_class( data=request.data, ) if serializer.is_valid(): # save model instance data password = serializer.validated_data.pop('password') instance = self.model.objects.create_user(**serializer.validated_data) instance.set_password(password) instance.save() # send verification email utils.send_verification_email(request, instance) return Response(self.read_serializer_class( instance, many=False, context={'request': request}).data, status=status.HTTP_201_CREATED) else: return Response( serializer.errors, status=status.HTTP_406_NOT_ACCEPTABLE) except Exception as e: return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ChangeUserPasswordView(UpdateAPIView): model = models.CustomUser queryset = model.objects.all() permission_classes = (YourOwnUserPermissions,) serializer_class = core_serializers.ChangePasswordSerializer @api_view(['POST',]) @permission_classes([AllowAny]) def create_company_user(request): """ Create non-validated company and associated managing user """ if 'user' not in request.data: return Response({"error": "Missing parameter: user"}, status=406) if 'company' not in request.data: return Response({"error": "Missing parameter: company"}, status=406) # create company company_data = request.data['company'] company_serializer = CompanySerializer( data=company_data, ) if company_serializer.is_valid(): # save model instance data new_company = Company.objects.create(**company_serializer.validated_data) else: return Response({"error": company_serializer.errors}, status=406) # create user user_data = request.data['user'] user_data['role'] = 'COOP_MANAGER' user_data['company'] = new_company.id user_serializer = core_serializers.CustomUserWriteSerializer( data=user_data, ) if user_serializer.is_valid(): # save model instance data password = user_serializer.validated_data.pop('password') new_user = User.objects.create_user(**user_serializer.validated_data) new_user.set_password(password) new_user.save() # send verification email utils.send_verification_email(request, new_user) else: return Response({"error": user_serializer.errors}, status=406) return Response(status=status.HTTP_201_CREATED) @api_view(['GET',]) @permission_classes([IsAuthenticated,]) def my_user(request): try: instance = User.objects.get(email=request.user.email) user_serializer = core_serializers.CustomUserReadSerializer(instance) return Response(data=user_serializer.data) except Exception as e: return Response({'error': {str(type(e))}}, status=500) @api_view(['POST',]) @permission_classes([IsAdminUser,]) def load_coop_managers(request): """Read CSV file being received Parse it to create users and related companies """ try: csv_file = request.FILES['csv_file'] if csv_file.name.endswith('.csv') is not True: logging.error(f"File {csv_file.name} is not a CSV file") return Response({"errors":{"details": "File is not CSV type"}}) logging.info(f"Reading contents of {csv_file.name}") decoded_file = csv_file.read().decode('utf-8').splitlines() csv_reader = csv.DictReader(decoded_file, delimiter=',') coop_count, user_count = utils.coop_loader(csv_reader, request) return Response({'details': f"Created {coop_count} Companies, {user_count} Managing Users"}) except Exception as e: return Response({"errors": {"details": f'{type(e)}: {e}'}}) @api_view(['GET',]) @permission_classes([AllowAny,]) def activate_user(request, uidb64, token): try: uid = force_text(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): user = None if user is not None and utils.account_activation_token.check_token(user, token): # activate user user.is_active = True user.save() if settings.ACTIVATION_REDIRECT: return redirect(settings.ACTIVATION_REDIRECT) return Response(f"Tu cuenta de usuario {user.email} ha sido activada") else: return Response({"error": f"Tu token de verificacion no coincide con ningĂșn usuario registrado"}, status=status.HTTP_406_NOT_ACCEPTABLE) @api_view(['GET',]) @permission_classes([IsAdminUser,]) def admin_stats(request): company_count = Company.objects.count() product_count = Product.objects.count() companies_per_region = {} products_per_region = {} for region in Region.objects.all(): count = Company.objects.filter(geo__within=region.geo).count() companies_per_region[region.name] = count count = Product.objects.filter(company__geo__within=region.geo).count() products_per_region[region.name] = count today = datetime.date.today() # companies timeline: count companies at increments of 4 weeks companies_timeline = [] # products timeline: count products at increments of 4 weeks products_timeline = [] # users timeline: count users at increments of 4 weeks users_timeline = [] # contact timeline: count statlogs from contact at increments of 4 weeks contact_timeline = [] # shopping timeline: count statlogs from shopping at increments of 4 weeks shopping_timeline = [] for i in reversed(range(12)): before = datetime.date(today.year, today.month, 1, ) - relativedelta(months=i) after = datetime.date(today.year, today.month, 1) - relativedelta(months=i-1) # companies companies_timeline.append({ 'month': before.month, 'value': Company.objects.filter(created__range=[before,after]).count() }) # products products_timeline.append({ 'month': before.month, 'value': Product.objects.filter(created__range=[before,after]).count() }) # users users_timeline.append({ 'month': before.month, 'value': User.objects.filter(created__range=[before,after]).count() }) # contact contact_timeline.append({ 'month': before.month, 'value': StatsLog.objects.filter(contact=True, created__range=[before,after]).count() }) # shopping shopping_timeline.append({ 'month': before.month, 'value': StatsLog.objects.filter(shop=True, created__range=[before,after]).count() }) data = { 'company_count': company_count, 'product_count': product_count, 'companies_per_region': companies_per_region, 'products_per_region': products_per_region, 'companies_timeline': companies_timeline, 'products_timeline': products_timeline, 'users_timeline': users_timeline, 'contact_timeline': contact_timeline, 'shopping_timeline': shopping_timeline, } return Response(data=data)