diff --git a/back_latienda/permissions.py b/back_latienda/permissions.py index 60f46e8..992f43e 100644 --- a/back_latienda/permissions.py +++ b/back_latienda/permissions.py @@ -29,7 +29,11 @@ class IsStaff(permissions.BasePermission): if obj is not None: if request.user.is_staff is True: return True - return False + return + + def has_permission(self, request, view): + return request.user.is_staff + class ReadOnly(permissions.BasePermission): diff --git a/back_latienda/urls.py b/back_latienda/urls.py index d0251d5..3dd674f 100644 --- a/back_latienda/urls.py +++ b/back_latienda/urls.py @@ -23,6 +23,7 @@ from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView from core import views as core_views from products import views as product_views from companies import views as company_views +from stats import views as stat_views from .routers import router @@ -40,5 +41,6 @@ urlpatterns = [ path('api/v1/my_user/', core_views.my_user, name='my-user'), path('api/v1/my_company/', company_views.my_company , name='my-company'), path('api/v1/my_products/', product_views.my_products, name='my-products'), + path('api/v1/stats/me/', stat_views.track_user, name='user-tracker'), path('api/v1/', include(router.urls)), ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) diff --git a/companies/views.py b/companies/views.py index 477b4b2..2128b07 100644 --- a/companies/views.py +++ b/companies/views.py @@ -24,135 +24,135 @@ from utils import woocommerce class CompanyViewSet(viewsets.ModelViewSet): - queryset = Company.objects.all() - serializer_class = CompanySerializer - permission_classes = [IsAuthenticatedOrReadOnly, IsCreator] - filterset_class = CompanyTagFilter + queryset = Company.objects.all() + serializer_class = CompanySerializer + permission_classes = [IsAuthenticatedOrReadOnly, IsCreator] + filterset_class = CompanyTagFilter - def perform_create(self, serializer): - serializer.save(creator=self.request.user) + def perform_create(self, serializer): + serializer.save(creator=self.request.user) - @action(detail=True, methods=['POST', ]) - def email_manager(self, request, **kwargs): - """ - Send email to company.creator - """ - try: - instance = self.queryset.filter(pk=kwargs['pk']).first() - if instance: - # IP stuff - client_ip, is_routable = get_client_ip(request) - g = GeoIP2() - - # deserialize payload - data = json.loads(request.body) - if request.user.is_authenticated: - # send email to manager - company_message = render_to_string('company_contact.html', { - 'company': instance, - 'email': request.user.email, - 'full_name': request.user.full_name, - 'quantity': data['quantity'], - 'phone_number': data.get('phone_number'), - 'comments': data['comments'], - 'product_info': data['product_info'], - }) - user_message = render_to_string('confirm_company_contact.html', { - 'company': instance, - 'username': request.user.full_name, - 'data': data, - }) - # send email to company - subject = "Contacto de usuario" - email = EmailMessage(subject, company_message, to=[instance.creator.email]) - email.send() - logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}") - # send confirmation email to user - subject = 'Confirmación de contacto' - email = EmailMessage(subject, message, to=[request.user.email]) - email.send() - logging.info(f"Contact confirmation email sent to {request.user.email}") - stats_data = { - 'action_object': instance, - 'user': None, - 'anonymous': True, - 'ip_address': client_ip, - 'geo': g.geos(client_ip), - 'contact': True, - 'shop': instance.shop, - } - else: - # for unauthenticated users - company_message = render_to_string('company_contact.html', { - 'company': instance, - 'email': data['email'], - 'full_name': data['full_name'], - 'quantity': data['quantity'], - 'phone_number': data.get('phone_number'), - 'comments': data['comments'], - 'product_info': data['product_info'], - }) - user_message = render_to_string('confirm_company_contact.html', { - 'company': instance, - 'username': data['full_name'], - }) - # send email to company - email = EmailMessage(subject, company_message, to=[instance.creator.email]) - email.send() - logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}") - # send confirmation email to user - email = EmailMessage(subject, user_message, to=[data['email']]) - email.send() - logging.info(f"Contact confirmation email sent to anonymous user {data['email']}") - # statslog data to register interaction - stats_data = { - 'action_object': instance, - 'user': request.user, - 'anonymous': False, - 'ip_address': client_ip, - 'geo': g.geos(client_ip), - 'contact': True, - 'shop': instance.shop, - } - # create statslog instance to register interaction - StatsLog.objects.create(**stats_data) - - return Response(data=data) - else: - return Response({"errors":{"details": f"No instance of company with id {kwargs['pk']}",}}) - except Exception as e: - return Response({"errors":{"details": str(e),}}, status=500) - - @action(detail=True, methods=['GET', ]) - def import_products(self, request, **kwargs): + @action(detail=True, methods=['POST', ]) + def email_manager(self, request, **kwargs): + """ + Send email to company.creator + """ + try: instance = self.queryset.filter(pk=kwargs['pk']).first() - # check if it's a shop - if instance.shop is not True: - return Response({'error': 'This company is not a shop'}) - # check required credentials - credentials = instance.credentials - if credentials is None or credentials == {}: - return Response({'error': 'This company has no registered credentials'}) - # check what platform - platform = instance.platform - if platform is None: - message = {'error': 'This company is not registered with any platforms'} - elif platform == 'WOO_COMMERCE': - # recheck credentials - if 'key' in credentials.keys() and 'secret' in credentials.keys(): - # execute import - products = woocommerce.migrate_shop_products( - instance.web_link, - credentials['key'], - credentials['secret'], - request.user, - ) - message = {'details': f'{len(products)} products added for {instance.company_name}'} - else: - message = {"error": 'Credentials have wrong format'} + if instance: + # IP stuff + client_ip, is_routable = get_client_ip(request) + g = GeoIP2() + + # deserialize payload + data = json.loads(request.body) + if request.user.is_authenticated: + # send email to manager + company_message = render_to_string('company_contact.html', { + 'company': instance, + 'email': request.user.email, + 'full_name': request.user.full_name, + 'quantity': data['quantity'], + 'phone_number': data.get('phone_number'), + 'comments': data['comments'], + 'product_info': data['product_info'], + }) + user_message = render_to_string('confirm_company_contact.html', { + 'company': instance, + 'username': request.user.full_name, + 'data': data, + }) + # send email to company + subject = "Contacto de usuario" + email = EmailMessage(subject, company_message, to=[instance.creator.email]) + email.send() + logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}") + # send confirmation email to user + subject = 'Confirmación de contacto' + email = EmailMessage(subject, message, to=[request.user.email]) + email.send() + logging.info(f"Contact confirmation email sent to {request.user.email}") + stats_data = { + 'action_object': instance, + 'user': None, + 'anonymous': True, + 'ip_address': client_ip, + 'geo': g.geos(client_ip), + 'contact': True, + 'shop': instance.shop, + } + else: + # for unauthenticated users + company_message = render_to_string('company_contact.html', { + 'company': instance, + 'email': data['email'], + 'full_name': data['full_name'], + 'quantity': data['quantity'], + 'phone_number': data.get('phone_number'), + 'comments': data['comments'], + 'product_info': data['product_info'], + }) + user_message = render_to_string('confirm_company_contact.html', { + 'company': instance, + 'username': data['full_name'], + }) + # send email to company + email = EmailMessage(subject, company_message, to=[instance.creator.email]) + email.send() + logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}") + # send confirmation email to user + email = EmailMessage(subject, user_message, to=[data['email']]) + email.send() + logging.info(f"Contact confirmation email sent to anonymous user {data['email']}") + # statslog data to register interaction + stats_data = { + 'action_object': instance, + 'user': request.user, + 'anonymous': False, + 'ip_address': client_ip, + 'geo': g.geos(client_ip), + 'contact': True, + 'shop': instance.shop, + } + # create statslog instance to register interaction + StatsLog.objects.create(**stats_data) + + return Response(data=data) else: - message = {'error': f'Platform {plaform} not registered'} - return Response(message) + return Response({"errors":{"details": f"No instance of company with id {kwargs['pk']}",}}) + except Exception as e: + return Response({"errors":{"details": str(e),}}, status=500) + + @action(detail=True, methods=['GET', ]) + def import_products(self, request, **kwargs): + instance = self.queryset.filter(pk=kwargs['pk']).first() + # check if it's a shop + if instance.shop is not True: + return Response({'error': 'This company is not a shop'}) + # check required credentials + credentials = instance.credentials + if credentials is None or credentials == {}: + return Response({'error': 'This company has no registered credentials'}) + # check what platform + platform = instance.platform + if platform is None: + message = {'error': 'This company is not registered with any platforms'} + elif platform == 'WOO_COMMERCE': + # recheck credentials + if 'key' in credentials.keys() and 'secret' in credentials.keys(): + # execute import + products = woocommerce.migrate_shop_products( + instance.web_link, + credentials['key'], + credentials['secret'], + request.user, + ) + message = {'details': f'{len(products)} products added for {instance.company_name}'} + else: + message = {"error": 'Credentials have wrong format'} + else: + message = {'error': f'Platform {plaform} not registered'} + return Response(message) @api_view(['GET',]) diff --git a/stats/factories.py b/stats/factories.py index cd4faa3..c6db6a0 100644 --- a/stats/factories.py +++ b/stats/factories.py @@ -7,7 +7,7 @@ from factory.django import DjangoModelFactory class StatsLogFactory(DjangoModelFactory): - model = None + action_object = None user = SubFactory('core.factories.CustomUserFactory') anonymous = FuzzyChoice(choices=(True, False)) ip_address = '127.0.0.1' @@ -17,4 +17,4 @@ class StatsLogFactory(DjangoModelFactory): class Meta: - model = 'stats.StatsLog' \ No newline at end of file + model = 'stats.StatsLog' diff --git a/stats/serializers.py b/stats/serializers.py index 6551812..4e8b5ad 100644 --- a/stats/serializers.py +++ b/stats/serializers.py @@ -5,4 +5,4 @@ from stats.models import StatsLog class StatsLogSerializer(serializers.ModelSerializer): class Meta: model = StatsLog - exclude = ['created', 'updated', 'creator'] + exclude = ['created', 'updated',] diff --git a/stats/tests.py b/stats/tests.py index 7ce503c..2178ddb 100644 --- a/stats/tests.py +++ b/stats/tests.py @@ -1,3 +1,329 @@ -from django.test import TestCase +import random +import string + +from rest_framework.test import APITestCase +from rest_framework import status + +from core.factories import CustomUserFactory +from core.utils import get_tokens_for_user + +from .models import StatsLog +from .factories import StatsLogFactory + # Create your tests here. +class TrackUserViewTest(APITestCase): + """ProductViewSet tests + """ + + def setUp(self): + """Tests setup + """ + self.endpoint = '/api/v1/stats/me/' + self.factory = StatsLogFactory + self.model = StatsLog + # create user + self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) + self.user = CustomUserFactory(email="test@mail.com", password=self.password, is_active=True) + + # anon user + def test_anon_user_can_only_post(self): + """Not logged-in user cannot create new instance + """ + # Query endpoint + response = self.client.get(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # Query endpoint + response = self.client.put(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # Query endpoint + response = self.client.delete(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # Query endpoint + response = self.client.post(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_anon_user_can_register_product_action(self): + """Not logged-in user cannot modify existing instance + """ + # Create instance + instance = self.factory() + + # Query endpoint + url = self.endpoint + f'{instance.pk}/' + response = self.client.put(url, {}, format='json') + + # Assert forbidden code + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_anon_user_can_register_company_action(self): + """Not logged-in user cannot modify existing instance + """ + # Create instance + instance = self.factory() + + # Query endpoint + url = self.endpoint + f'{instance.pk}/' + response = self.client.put(url, {}, format='json') + + # Assert forbidden code + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # authenticated user + def test_auth_user_can_only_post(self): + """Regular logged-in user can list instance + """ + # Create instances + instances = [self.factory() for n in range(random.randint(1,5))] + + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Request list + response = self.client.get(self.endpoint) + + # Assert access is allowed + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Assert all instances are returned + self.assertEqual(len(instances), len(response.data)) + + def test_auth_user_can_register_product_action(self): + """Regular logged-in user can list instance + """ + # Create instances + instance = self.factory() + + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Request list + url = f"{self.endpoint}{instance.id}/" + response = self.client.get(url) + + # Assert access is allowed + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = json.loads(response.content) + self.assertEquals(instance.id, data['id']) + + def test_auth_user_can_register_company_action(self): + """Regular logged-in user can create new instance + """ + # Define request data + company = CompanyFactory() + data = { + 'company': company.id, + 'sku': 'qwerewq', + 'name': 'qwerewq', + 'description': 'qwerewq', + 'url': 'http://qwerewq.com', + 'price': '12.21', + 'shipping_cost': '21.12', + 'shipping_terms': 'qwerewq', + 'source': 'SYNCHRONIZED', + 'sourcing_date': datetime.datetime.now().isoformat()+'Z', + 'update_date': datetime.datetime.now().isoformat()+'Z', + 'discount': '0.05', + 'stock': 22, + 'tags': ['tag1, tag2'], + 'category': 'Mr', + 'attributes': ['color/red', 'size/xxl'], + 'identifiers': '34rf34f43c43', + } + + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Query endpoint + response = self.client.post(self.endpoint, data=data, format='json') + # Assert endpoint returns created status + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + # Assert instance exists on db + self.assertTrue(self.model.objects.get(id=response.data['id'])) + + +class StatsLogViewSetTest(APITestCase): + """ProductViewSet tests + """ + + def setUp(self): + """Tests setup + """ + self.endpoint = '/api/v1/stats/' + self.factory = StatsLogFactory + self.model = StatsLog + # create user + self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) + self.user = CustomUserFactory(email="test@mail.com", password=self.password, is_active=True) + self.instance_data = { + 'action_object': None, + 'user': self.user.id, + 'anonymous': False, + 'ip_address': None, + 'geo': None, + 'contact': None, + 'shop': True, + + } + self.alt_data = { + 'action_object': None, + 'user': self.user.id, + 'anonymous': True, + 'ip_address': None, + 'geo': None, + 'contact': None, + 'shop': False, + } + + # anonymous user + def test_anon_user_cannot_crud(self): + """Not logged-in user cannot access endpoint at all + """ + + # Query endpoint + response = self.client.get(self.endpoint) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # Query endpoint + response = self.client.post(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # Query endpoint + response = self.client.put(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # Query endpoint + response = self.client.delete(self.endpoint) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + # authenticated user + def test_auth_user_cannot_crud(self): + """Authenticated user cannot access endpoint at all + """ + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Query endpoint + response = self.client.get(self.endpoint) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + # Query endpoint + response = self.client.post(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + # Query endpoint + response = self.client.put(self.endpoint, data={}) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + # Query endpoint + response = self.client.delete(self.endpoint) + # Assert access is forbidden + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + # admin user + def test_admin_user_can_list_instance(self): + """Admin user can list instance + """ + # make user be admin + self.user.is_staff = True + self.user.save() + + # Create instances + instances = [self.factory() for n in range(random.randint(1,5))] + + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Request list + response = self.client.get(self.endpoint) + + # Assert access is allowed + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Assert all instances are returned + self.assertEqual(len(instances), len(response.data)) + + def test_admin_user_can_create_instance(self): + """Admin user can create new instance + """ + # make user be admin + self.user.is_staff = True + self.user.save() + + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Query endpoint + response = self.client.post(self.endpoint, data=self.instance_data, format='json') + + # Assert endpoint returns created status + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + # Assert instance exists on db + self.assertTrue(self.model.objects.get(id=response.data['id'])) + + def test_admin_user_can_modify_instance(self): + """Admin user can modify existing instance + """ + # make user be admin + self.user.is_staff = True + self.user.save() + + # Create instances + instance = self.factory() + + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Query endpoint + url = self.endpoint + f'{instance.pk}/' + response = self.client.put(url, self.alt_data, format='json') + + # Assert endpoint returns OK code + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_admin_user_can_delete_instance(self): + """Admin user can delete existing instance + """ + # make user be admin + self.user.is_staff = True + self.user.save() + + # Create instances + instance = self.factory() + + # Authenticate user + token = get_tokens_for_user(self.user) + self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") + + # Query endpoint + url = self.endpoint + f'{instance.pk}/' + response = self.client.delete(url) + + # assert 204 no content + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + # Assert instance doesn't exists anymore on db + self.assertFalse(self.model.objects.filter(id=instance.pk).exists()) + diff --git a/stats/views.py b/stats/views.py index a8f7532..cd0cf78 100644 --- a/stats/views.py +++ b/stats/views.py @@ -1,19 +1,70 @@ -from django.shortcuts import render +import json +import logging # Create your views here. from rest_framework import viewsets -from stats.models import StatsLog -from stats.serializers import StatsLogSerializer +from rest_framework.decorators import api_view +from rest_framework.response import Response +from rest_framework import status + +from ipware import get_client_ip +from django.contrib.gis.geoip2 import GeoIP2 from back_latienda.permissions import IsStaff +from products.models import Product +from companies.models import Company + +from stats.models import StatsLog +from stats.serializers import StatsLogSerializer + class StatsLogViewSet(viewsets.ModelViewSet): - queryset = StatsLog.objects.all() - serializer_class = StatsLogSerializer - permission_classes = [IsStaff,] - - def perform_create(self, serializer): - serializer.save(creator=self.request.user) + queryset = StatsLog.objects.all() + serializer_class = StatsLogSerializer + permission_classes = [IsStaff,] +@api_view(['POST']) +def track_user(request): + """Track user actions on the site + + Params: + { + action: view, + object: { + model: name, + id: 1, + }, + } + """ + try: + data = json.loads(request.body) + + # geoip stuff + client_ip, is_routable = get_client_ip(request) + g = GeoIP2() + + # gather instance data + instance_data = { + 'action': data.get('action'), + 'user': request.user, + 'anonymous': request.user.is_anonymous, + 'ip_address': client_ip, + 'geo': g.geos(client_ip), + # 'contact' ??? + } + + if data['object'].get('name') == 'product': + instance_data['action_object'] = Product.objects.get(id=data['object'].get('id')) + elif data['object'].get('name') == 'company': + instance_data['action_object'] = Company.objects.get(id=data['object'].get('id')) + if instance_data['action_object'].shop is True: + instance_data['shop'] = True + + # crate new instance + new_stat = StatsLog.objects.create(**instance_data) + return Response(status=status.HTTP_201_CREATED) + except Exception as e: + logging.error(f"Stats could not be created: {str(e)}") + return Response(f"Process could not be registered: {str(type(e))}")