import csv import logging import json import io import datetime from django.shortcuts import render from django.http import HttpResponse from django.contrib.auth import get_user_model from django.utils.http import urlsafe_base64_decode from django.utils.encoding import force_text from django.db import IntegrityError from django.contrib.gis.geos import Point from rest_framework import status from rest_framework import viewsets from rest_framework.response import Response from rest_framework.permissions import IsAdminUser, IsAuthenticated, AllowAny from rest_framework.generics import UpdateAPIView from rest_framework.decorators import api_view, permission_classes from companies.models import Company from geo.models import City from . import models from . import serializers as core_serializers from . import utils from back_latienda.permissions import CustomUserPermissions, YourOwnUserPermissions User = get_user_model() # Create your views here. logging.basicConfig( filename='logs/csv-load.log', filemode='w', format='%(levelname)s:%(message)s', level=logging.INFO, ) class CustomUserViewSet(viewsets.ModelViewSet): model = models.CustomUser model_name = 'custom_user' queryset = models.CustomUser.objects.all() permission_classes = [CustomUserPermissions,] read_serializer_class = core_serializers.CustomUserReadSerializer write_serializer_class = core_serializers.CustomUserWriteSerializer def get_serializer_class(self): if self.action=='create': return core_serializers.CustomUserWriteSerializer elif self.action == 'update' and self.request.user.is_staff is False: return core_serializers.UpdateUserSerializer elif self.request.user.is_staff is True: return core_serializers.CustomUserAdminSerializer return core_serializers.CustomUserSerializer def get_permissions(self): if self.action in ['retrieve', 'update', 'partial_update', 'destroy'] and self.request.user.is_anonymous is False: return [YourOwnUserPermissions(), ] return super(CustomUserViewSet, self).get_permissions() def create(self, request): """ Create Instance """ try: serializer_class = self.get_serializer_class() serializer = serializer_class( data=request.data, ) if serializer.is_valid(): # save model instance data password = serializer.validated_data.pop('password') instance = self.model(**serializer.validated_data) instance.set_password(password) instance.save() return Response(self.read_serializer_class( instance, many=False, context={'request': request}).data, status=status.HTTP_201_CREATED) else: return Response( serializer.errors, status=status.HTTP_406_NOT_ACCEPTABLE) except Exception as e: return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ChangeUserPasswordView(UpdateAPIView): model = models.CustomUser queryset = model.objects.all() permission_classes = (YourOwnUserPermissions,) serializer_class = core_serializers.ChangePasswordSerializer class UpdateUserView(UpdateAPIView): model = models.CustomUser queryset = model.objects.all() permission_classes = (YourOwnUserPermissions,) serializer_class = core_serializers.UpdateUserSerializer @api_view(['POST',]) @permission_classes([CustomUserPermissions,]) def create_company_user(request): """ Create non-validated company and manager user associated """ user_data = { 'email': request.data['user']['email'], 'password': request.data['user']['password'] } company_data = { 'cif': request.data['company']['cif'], 'company_name': request.data['company']['company_name'], 'short_name': request.data['company']['short_name'], 'web_link': request.data['company']['web_link'], 'shop': request.data['company']['shop'], 'city': request.data['company']['city'], 'geo': request.data['company']['geo'], 'address': request.data['company']['address'] } try: user = models.CustomUser.objects.create(email=user_data['email']) except IntegrityError as e: return Response({"errors": {"details": str(e)}}, status=status.HTTP_409_CONFLICT) try: city = company_data.pop('city') city = City.objects.get(name=city) geo = company_data.pop('geo') geo = Point(geo['lng'], geo['lat']) company = Company.objects.create(**company_data, city=city, geo=geo) except Exception as e: user.delete() return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) user.set_password(user_data['password']) user.company = company user.role = 'COOP_MANAGER' user.save() company.creator = user company.save() serializer = core_serializers.CustomUserSerializer(user) return Response(data=serializer.data,status=status.HTTP_201_CREATED) @api_view(['GET',]) @permission_classes([IsAuthenticated,]) def my_user(request): try: instance = User.objects.get(email=request.user.email) user_serializer = core_serializers.CustomUserReadSerializer(instance) return Response(data=user_serializer.data) except Exception as e: return Response({'error': {str(type(e))}}, status=500) @api_view(['POST',]) @permission_classes([IsAdminUser,]) def load_coop_managers(request): """Read CSV file being received Parse it to create users and related companies """ try: csv_file = request.FILES['csv_file'] if csv_file.name.endswith('.csv') is not True: logging.error(f"File {csv_file.name} is not a CSV file") return Response({"errors":{"details": "File is not CSV type"}}) logging.info(f"Reading contents of {csv_file.name}") decoded_file = csv_file.read().decode('utf-8').splitlines() csv_reader = csv.DictReader(decoded_file, delimiter=',') coop_counter = 0 user_counter = 0 for row in csv_reader: if '' in (row['cif'], row['nombre-coop'], row['email']): logging.error(f"Required data missing: {row}") continue try: coop_data = { 'cif': row['cif'].strip(), 'company_name': row['nombre-coop'].strip(), 'short_name': row['nombre-corto'].strip(), 'shop': bool(row['es-tienda'].strip()), 'shop_link': row['url'].strip(), } coop = Company.objects.create(**coop_data) logging.info(f"Created Coop: {coop_data}") coop_counter += 1 coop_user = User.objects.create_user(email=row['email'], company=coop, role='COOP_MANAGER', is_active=False) # send confirmation email utils.send_verification_email(request, coop_user) logging.info(f"Created User: {coop_user}") user_counter += 1 except Exception as e: logging.error(f"Could not parse {row}") return Response() except Exception as e: return Response({"errors": {"details": str(type(e))}}) @api_view(['GET',]) @permission_classes([AllowAny,]) def activate_user(request, uidb64, token): try: uid = force_text(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): user = None if user is not None and account_activation_token.check_token(user, token): # activate user user.is_active = True user.save() return HttpResponse(f"Tu cuenta de usuario {request.user.email} ha sido activada") else: return HttpResponse(f"Tu token de verificacion no coincide con ningĂșn usuario registrado")