import csv import logging import json import io import datetime from django.shortcuts import render from django.http import HttpResponse from django.contrib.auth import get_user_model from django.utils.http import urlsafe_base64_decode from django.utils.encoding import force_text from django.db import IntegrityError from django.contrib.gis.geos import Point from rest_framework import status from rest_framework import viewsets from rest_framework.response import Response from rest_framework.permissions import IsAdminUser, IsAuthenticated, AllowAny from rest_framework.generics import UpdateAPIView from rest_framework.decorators import api_view, permission_classes from companies.models import Company from companies.serializers import CompanySerializer from geo.models import City from . import models from . import serializers as core_serializers from . import utils from back_latienda.permissions import CustomUserPermissions, YourOwnUserPermissions User = get_user_model() # Create your views here. logging.basicConfig( filename='logs/csv-load.log', filemode='w', format='%(levelname)s:%(message)s', level=logging.INFO, ) class CustomUserViewSet(viewsets.ModelViewSet): model = models.CustomUser model_name = 'custom_user' queryset = models.CustomUser.objects.all() permission_classes = [CustomUserPermissions,] read_serializer_class = core_serializers.CustomUserReadSerializer write_serializer_class = core_serializers.CustomUserWriteSerializer def get_serializer_class(self): if self.action=='create': return core_serializers.CustomUserWriteSerializer elif self.action == 'update' and self.request.user.is_staff is False: return core_serializers.UpdateUserSerializer elif self.request.user.is_staff is True: return core_serializers.CustomUserAdminSerializer return core_serializers.CustomUserSerializer def get_permissions(self): if self.action in ['retrieve', 'update', 'partial_update', 'destroy'] and self.request.user.is_anonymous is False: return [YourOwnUserPermissions(), ] return super(CustomUserViewSet, self).get_permissions() def create(self, request): """ Create Instance """ try: serializer_class = self.get_serializer_class() serializer = serializer_class( data=request.data, ) if serializer.is_valid(): # save model instance data password = serializer.validated_data.pop('password') instance = self.model(**serializer.validated_data) instance.set_password(password) instance.save() # send verification email utils.send_verification_email(request, instance) return Response(self.read_serializer_class( instance, many=False, context={'request': request}).data, status=status.HTTP_201_CREATED) else: return Response( serializer.errors, status=status.HTTP_406_NOT_ACCEPTABLE) except Exception as e: return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ChangeUserPasswordView(UpdateAPIView): model = models.CustomUser queryset = model.objects.all() permission_classes = (YourOwnUserPermissions,) serializer_class = core_serializers.ChangePasswordSerializer class UpdateUserView(UpdateAPIView): model = models.CustomUser queryset = model.objects.all() permission_classes = (YourOwnUserPermissions,) serializer_class = core_serializers.UpdateUserSerializer @api_view(['POST',]) @permission_classes([CustomUserPermissions,]) def create_company_user(request): """ Create non-validated company and associated managing user """ if 'user' not in request.data: return Response({"error": "Missing parameter: user"}, status=406) if 'company' not in request.data: return Response({"error": "Missing parameter: company"}, status=406) # create company company_data = request.data['company'] # substitute coordinates for Point geo = company_data.pop('geo') if geo: company_data['geo'] = Point(geo['latitude'],geo['longitude']) company_serializer = CompanySerializer( data=company_data, ) if company_serializer.is_valid(): # save model instance data new_company = Company.objects.create(**company_serializer.validated_data) else: return Response({"error": "Company data is not valid"}, status=406) # create user user_data = request.data['user'] user_data['role'] = 'COOP_MANAGER' user_data['company'] = new_company.id user_serializer = core_serializers.CustomUserWriteSerializer( data=user_data, ) if user_serializer.is_valid(): # save model instance data password = user_serializer.validated_data.pop('password') new_user = User(**user_serializer.validated_data) new_user.set_password(password) new_user.save() # send verification email utils.send_verification_email(request, new_user) else: return Response({"error": "User data is not valid"}, status=406) return Response(status=status.HTTP_201_CREATED) @api_view(['GET',]) @permission_classes([IsAuthenticated,]) def my_user(request): try: instance = User.objects.get(email=request.user.email) user_serializer = core_serializers.CustomUserReadSerializer(instance) return Response(data=user_serializer.data) except Exception as e: return Response({'error': {str(type(e))}}, status=500) @api_view(['POST',]) @permission_classes([IsAdminUser,]) def load_coop_managers(request): """Read CSV file being received Parse it to create users and related companies """ try: csv_file = request.FILES['csv_file'] if csv_file.name.endswith('.csv') is not True: logging.error(f"File {csv_file.name} is not a CSV file") return Response({"errors":{"details": "File is not CSV type"}}) logging.info(f"Reading contents of {csv_file.name}") decoded_file = csv_file.read().decode('utf-8').splitlines() csv_reader = csv.DictReader(decoded_file, delimiter=',') coop_count, user_count = utils.coop_loader(csv_reader, request) return Response({'details': f"Created {coop_count} Companies, {user_count} Managing Users"}) except Exception as e: return Response({"errors": {"details": f'{type(e)}: {e}'}}) @api_view(['GET',]) @permission_classes([AllowAny,]) def activate_user(request, uidb64, token): try: uid = force_text(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): user = None if user is not None and utils.account_activation_token.check_token(user, token): # activate user user.is_active = True user.save() return Response(f"Tu cuenta de usuario {user.email} ha sido activada") else: return Response({"error": f"Tu token de verificacion no coincide con ningĂșn usuario registrado"}, status=status.HTTP_406_NOT_ACCEPTABLE)