233 lines
8.0 KiB
Python
233 lines
8.0 KiB
Python
import csv
|
|
import logging
|
|
import json
|
|
import io
|
|
import datetime
|
|
|
|
from django.shortcuts import render
|
|
from django.http import HttpResponse
|
|
from django.contrib.auth import get_user_model
|
|
from django.utils.http import urlsafe_base64_decode
|
|
from django.utils.encoding import force_text
|
|
from django.db import IntegrityError
|
|
from django.contrib.gis.geos import Point
|
|
|
|
from rest_framework import status
|
|
from rest_framework import viewsets
|
|
from rest_framework.response import Response
|
|
from rest_framework.permissions import IsAdminUser, IsAuthenticated, AllowAny
|
|
from rest_framework.generics import UpdateAPIView
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
|
|
from companies.models import Company
|
|
from geo.models import City
|
|
|
|
from . import models
|
|
from . import serializers as core_serializers
|
|
from . import utils
|
|
|
|
from back_latienda.permissions import CustomUserPermissions, YourOwnUserPermissions
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
# Create your views here.
|
|
|
|
logging.basicConfig(
|
|
filename='logs/csv-load.log',
|
|
filemode='w',
|
|
format='%(levelname)s:%(message)s',
|
|
level=logging.INFO,
|
|
)
|
|
|
|
|
|
class CustomUserViewSet(viewsets.ModelViewSet):
|
|
|
|
model = models.CustomUser
|
|
model_name = 'custom_user'
|
|
queryset = models.CustomUser.objects.all()
|
|
permission_classes = [CustomUserPermissions,]
|
|
read_serializer_class = core_serializers.CustomUserReadSerializer
|
|
write_serializer_class = core_serializers.CustomUserWriteSerializer
|
|
|
|
def get_serializer_class(self):
|
|
if self.action=='create':
|
|
return core_serializers.CustomUserWriteSerializer
|
|
elif self.action == 'update' and self.request.user.is_staff is False:
|
|
return core_serializers.UpdateUserSerializer
|
|
elif self.request.user.is_staff is True:
|
|
return core_serializers.CustomUserAdminSerializer
|
|
return core_serializers.CustomUserSerializer
|
|
|
|
def get_permissions(self):
|
|
if self.action in ['retrieve', 'update', 'partial_update', 'destroy'] and self.request.user.is_anonymous is False:
|
|
return [YourOwnUserPermissions(), ]
|
|
return super(CustomUserViewSet, self).get_permissions()
|
|
|
|
def create(self, request):
|
|
"""
|
|
Create Instance
|
|
"""
|
|
try:
|
|
serializer_class = self.get_serializer_class()
|
|
serializer = serializer_class(
|
|
data=request.data,
|
|
)
|
|
if serializer.is_valid():
|
|
# save model instance data
|
|
password = serializer.validated_data.pop('password')
|
|
instance = self.model(**serializer.validated_data)
|
|
instance.set_password(password)
|
|
instance.save()
|
|
|
|
return Response(self.read_serializer_class(
|
|
instance, many=False, context={'request': request}).data,
|
|
status=status.HTTP_201_CREATED)
|
|
else:
|
|
return Response(
|
|
serializer.errors, status=status.HTTP_406_NOT_ACCEPTABLE)
|
|
except Exception as e:
|
|
return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
class ChangeUserPasswordView(UpdateAPIView):
|
|
|
|
model = models.CustomUser
|
|
queryset = model.objects.all()
|
|
permission_classes = (YourOwnUserPermissions,)
|
|
serializer_class = core_serializers.ChangePasswordSerializer
|
|
|
|
|
|
class UpdateUserView(UpdateAPIView):
|
|
|
|
model = models.CustomUser
|
|
queryset = model.objects.all()
|
|
permission_classes = (YourOwnUserPermissions,)
|
|
serializer_class = core_serializers.UpdateUserSerializer
|
|
|
|
|
|
@api_view(['POST',])
|
|
@permission_classes([CustomUserPermissions,])
|
|
def create_company_user(request):
|
|
"""
|
|
Create non-validated company and manager user associated
|
|
"""
|
|
user_data = {
|
|
'email': request.data['user']['email'],
|
|
'password': request.data['user']['password']
|
|
}
|
|
company_data = {
|
|
'cif': request.data['company']['cif'],
|
|
'company_name': request.data['company']['company_name'],
|
|
'short_name': request.data['company']['short_name'],
|
|
'web_link': request.data['company']['web_link'],
|
|
'shop': request.data['company']['shop'],
|
|
'city': request.data['company']['city'],
|
|
'geo': request.data['company']['geo'],
|
|
'address': request.data['company']['address']
|
|
}
|
|
try:
|
|
user = models.CustomUser.objects.create(email=user_data['email'])
|
|
except IntegrityError as e:
|
|
return Response({"errors": {"details": str(e)}}, status=status.HTTP_409_CONFLICT)
|
|
|
|
try:
|
|
city = company_data.pop('city')
|
|
#city = City.objects.get(name=city)
|
|
|
|
geo = company_data.pop('geo')
|
|
geo = Point(geo['lng'], geo['lat'])
|
|
|
|
company = Company.objects.create(**company_data, city=city, geo=geo)
|
|
except Exception as e:
|
|
user.delete()
|
|
return Response({"errors": {"details": str(e)}}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
user.set_password(user_data['password'])
|
|
user.company = company
|
|
user.role = 'COOP_MANAGER'
|
|
user.save()
|
|
|
|
company.creator = user
|
|
company.save()
|
|
|
|
serializer = core_serializers.CustomUserSerializer(user)
|
|
|
|
return Response(data=serializer.data,status=status.HTTP_201_CREATED)
|
|
|
|
|
|
@api_view(['GET',])
|
|
@permission_classes([IsAuthenticated,])
|
|
def my_user(request):
|
|
try:
|
|
instance = User.objects.get(email=request.user.email)
|
|
user_serializer = core_serializers.CustomUserReadSerializer(instance)
|
|
return Response(data=user_serializer.data)
|
|
except Exception as e:
|
|
return Response({'error': {str(type(e))}}, status=500)
|
|
|
|
|
|
|
|
@api_view(['POST',])
|
|
@permission_classes([IsAdminUser,])
|
|
def load_coop_managers(request):
|
|
"""Read CSV file being received
|
|
Parse it to create users and related companies
|
|
"""
|
|
try:
|
|
csv_file = request.FILES['csv_file']
|
|
if csv_file.name.endswith('.csv') is not True:
|
|
logging.error(f"File {csv_file.name} is not a CSV file")
|
|
return Response({"errors":{"details": "File is not CSV type"}})
|
|
|
|
logging.info(f"Reading contents of {csv_file.name}")
|
|
decoded_file = csv_file.read().decode('utf-8').splitlines()
|
|
csv_reader = csv.DictReader(decoded_file, delimiter=',')
|
|
coop_counter = 0
|
|
user_counter = 0
|
|
for row in csv_reader:
|
|
if '' in (row['cif'], row['nombre-coop'], row['email']):
|
|
logging.error(f"Required data missing: {row}")
|
|
continue
|
|
try:
|
|
coop_data = {
|
|
'cif': row['cif'].strip(),
|
|
'company_name': row['nombre-coop'].strip(),
|
|
'short_name': row['nombre-corto'].strip(),
|
|
'shop': bool(row['es-tienda'].strip()),
|
|
'shop_link': row['url'].strip(),
|
|
}
|
|
coop = Company.objects.create(**coop_data)
|
|
logging.info(f"Created Coop: {coop_data}")
|
|
coop_counter += 1
|
|
|
|
coop_user = User.objects.create_user(email=row['email'], company=coop, role='COOP_MANAGER', is_active=False)
|
|
# send confirmation email
|
|
utils.send_verification_email(request, coop_user)
|
|
logging.info(f"Created User: {coop_user}")
|
|
user_counter += 1
|
|
except Exception as e:
|
|
logging.error(f"Could not parse {row}")
|
|
|
|
return Response()
|
|
except Exception as e:
|
|
return Response({"errors": {"details": str(type(e))}})
|
|
|
|
|
|
@api_view(['GET',])
|
|
@permission_classes([AllowAny,])
|
|
def activate_user(request, uidb64, token):
|
|
try:
|
|
uid = force_text(urlsafe_base64_decode(uidb64))
|
|
user = User.objects.get(pk=uid)
|
|
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
|
|
user = None
|
|
|
|
if user is not None and account_activation_token.check_token(user, token):
|
|
# activate user
|
|
user.is_active = True
|
|
user.save()
|
|
return HttpResponse(f"Tu cuenta de usuario {request.user.email} ha sido activada")
|
|
else:
|
|
return HttpResponse(f"Tu token de verificacion no coincide con ningún usuario registrado")
|