import csv import logging import json import io import datetime from django.shortcuts import render from django.http import HttpResponse from django.contrib.auth import get_user_model from django.core import serializers from django.utils.http import urlsafe_base64_decode from django.utils.encoding import force_text from rest_framework import status from rest_framework import viewsets from rest_framework.response import Response from rest_framework.permissions import IsAdminUser, IsAuthenticated, AllowAny from rest_framework.generics import UpdateAPIView from rest_framework.decorators import api_view, permission_classes from companies.models import Company from . import models from . import serializers as core_serializers from . import utils from back_latienda.permissions import CustomUserPermissions, YourOwnUserPermissions User = get_user_model() # Create your views here. logging.basicConfig( filename='logs/csv-load.log', filemode='w', format='%(levelname)s:%(message)s', level=logging.INFO, ) class CustomUserViewSet(viewsets.ModelViewSet): model = models.CustomUser model_name = 'custom_user' queryset = models.CustomUser.objects.all() permission_classes = [CustomUserPermissions,] read_serializer_class = core_serializers.CustomUserReadSerializer write_serializer_class = core_serializers.CustomUserWriteSerializer def get_serializer_class(self): if self.action=='create': return core_serializers.CustomUserWriteSerializer elif self.action == 'update' and self.request.user.is_staff is False: return core_serializers.UpdateUserSerializer elif self.request.user.is_staff is True: return core_serializers.CustomUserAdminSerializer return core_serializers.CustomUserSerializer def get_permissions(self): if self.action in ['retrieve', 'update', 'partial_update', 'destroy'] and self.request.user.is_anonymous is False: return [YourOwnUserPermissions(), ] return super(CustomUserViewSet, self).get_permissions() def create(self, request): """ Create Instance """ try: serializer_class = self.get_serializer_class() serializer = serializer_class( data=request.data, ) if serializer.is_valid(): # save model instance data password = serializer.validated_data.pop('password') instance = self.model(**serializer.validated_data) instance.set_password(password) instance.save() return Response(self.read_serializer_class( instance, many=False, context={'request': request}).data, status=status.HTTP_201_CREATED) else: return Response( serializer.errors, status=status.HTTP_406_NOT_ACCEPTABLE) except Exception as e: return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ChangeUserPasswordView(UpdateAPIView): model = models.CustomUser queryset = model.objects.all() permission_classes = (YourOwnUserPermissions,) serializer_class = core_serializers.ChangePasswordSerializer class UpdateUserView(UpdateAPIView): model = models.CustomUser queryset = model.objects.all() permission_classes = (YourOwnUserPermissions,) serializer_class = core_serializers.UpdateUserSerializer @api_view(['GET',]) @permission_classes([IsAuthenticated,]) def my_user(request): qs = User.objects.filter(email=request.user.email) data = serializers.serialize('json', qs) return Response(data=data) @api_view(['POST',]) @permission_classes([IsAdminUser,]) def load_coop_managers(request): """Read CSV file being received Parse it to create users and related companies """ try: csv_file = request.FILES['csv_file'] if csv_file.name.endswith('.csv') is not True: logging.error(f"File {csv_file.name} is not a CSV file") return Response({"errors":{"details": "File is not CSV type"}}) logging.info(f"Reading contents of {csv_file.name}") decoded_file = csv_file.read().decode('utf-8').splitlines() csv_reader = csv.DictReader(decoded_file, delimiter=',') coop_counter = 0 user_counter = 0 for row in csv_reader: if '' in (row['cif'], row['nombre-coop'], row['email']): logging.error(f"Required data missing: {row}") continue try: coop_data = { 'cif': row['cif'].strip(), 'company_name': row['nombre-coop'].strip(), 'short_name': row['nombre-corto'].strip(), 'shop': bool(row['es-tienda'].strip()), 'shop_link': row['url'].strip(), } coop = Company.objects.create(**coop_data) logging.info(f"Created Coop: {coop_data}") coop_counter += 1 coop_user = User.objects.create_user(email=row['email'], company=coop, role='COOP_MANAGER', is_active=False) # send confirmation email utils.send_verification_email(request, coop_user) logging.info(f"Created User: {coop_user}") user_counter += 1 except Exception as e: logging.error(f"Could not parse {row}") return Response() except Exception as e: return Response({"errors": {"details": str(type(e))}}) @api_view(['GET',]) @permission_classes([AllowAny,]) def activate_user(request, uidb64, token): try: uid = force_text(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): user = None if user is not None and account_activation_token.check_token(user, token): # activate user user.is_active = True user.save() return HttpResponse(f"Tu cuenta de usuario {request.user.email} ha sido activada") else: return HttpResponse(f"Tu token de verificacion no coincide con ningĂșn usuario registrado")