added StatsLogViewSetTest

This commit is contained in:
Sam
2021-03-03 11:52:32 +00:00
parent ff8453e003
commit 884fe1e2b5
7 changed files with 522 additions and 139 deletions

View File

@@ -29,7 +29,11 @@ class IsStaff(permissions.BasePermission):
if obj is not None: if obj is not None:
if request.user.is_staff is True: if request.user.is_staff is True:
return True return True
return False return
def has_permission(self, request, view):
return request.user.is_staff
class ReadOnly(permissions.BasePermission): class ReadOnly(permissions.BasePermission):

View File

@@ -23,6 +23,7 @@ from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView
from core import views as core_views from core import views as core_views
from products import views as product_views from products import views as product_views
from companies import views as company_views from companies import views as company_views
from stats import views as stat_views
from .routers import router from .routers import router
@@ -40,5 +41,6 @@ urlpatterns = [
path('api/v1/my_user/', core_views.my_user, name='my-user'), path('api/v1/my_user/', core_views.my_user, name='my-user'),
path('api/v1/my_company/', company_views.my_company , name='my-company'), path('api/v1/my_company/', company_views.my_company , name='my-company'),
path('api/v1/my_products/', product_views.my_products, name='my-products'), path('api/v1/my_products/', product_views.my_products, name='my-products'),
path('api/v1/stats/me/', stat_views.track_user, name='user-tracker'),
path('api/v1/', include(router.urls)), path('api/v1/', include(router.urls)),
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

View File

@@ -24,135 +24,135 @@ from utils import woocommerce
class CompanyViewSet(viewsets.ModelViewSet): class CompanyViewSet(viewsets.ModelViewSet):
queryset = Company.objects.all() queryset = Company.objects.all()
serializer_class = CompanySerializer serializer_class = CompanySerializer
permission_classes = [IsAuthenticatedOrReadOnly, IsCreator] permission_classes = [IsAuthenticatedOrReadOnly, IsCreator]
filterset_class = CompanyTagFilter filterset_class = CompanyTagFilter
def perform_create(self, serializer): def perform_create(self, serializer):
serializer.save(creator=self.request.user) serializer.save(creator=self.request.user)
@action(detail=True, methods=['POST', ]) @action(detail=True, methods=['POST', ])
def email_manager(self, request, **kwargs): def email_manager(self, request, **kwargs):
""" """
Send email to company.creator Send email to company.creator
""" """
try: try:
instance = self.queryset.filter(pk=kwargs['pk']).first()
if instance:
# IP stuff
client_ip, is_routable = get_client_ip(request)
g = GeoIP2()
# deserialize payload
data = json.loads(request.body)
if request.user.is_authenticated:
# send email to manager
company_message = render_to_string('company_contact.html', {
'company': instance,
'email': request.user.email,
'full_name': request.user.full_name,
'quantity': data['quantity'],
'phone_number': data.get('phone_number'),
'comments': data['comments'],
'product_info': data['product_info'],
})
user_message = render_to_string('confirm_company_contact.html', {
'company': instance,
'username': request.user.full_name,
'data': data,
})
# send email to company
subject = "Contacto de usuario"
email = EmailMessage(subject, company_message, to=[instance.creator.email])
email.send()
logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}")
# send confirmation email to user
subject = 'Confirmación de contacto'
email = EmailMessage(subject, message, to=[request.user.email])
email.send()
logging.info(f"Contact confirmation email sent to {request.user.email}")
stats_data = {
'action_object': instance,
'user': None,
'anonymous': True,
'ip_address': client_ip,
'geo': g.geos(client_ip),
'contact': True,
'shop': instance.shop,
}
else:
# for unauthenticated users
company_message = render_to_string('company_contact.html', {
'company': instance,
'email': data['email'],
'full_name': data['full_name'],
'quantity': data['quantity'],
'phone_number': data.get('phone_number'),
'comments': data['comments'],
'product_info': data['product_info'],
})
user_message = render_to_string('confirm_company_contact.html', {
'company': instance,
'username': data['full_name'],
})
# send email to company
email = EmailMessage(subject, company_message, to=[instance.creator.email])
email.send()
logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}")
# send confirmation email to user
email = EmailMessage(subject, user_message, to=[data['email']])
email.send()
logging.info(f"Contact confirmation email sent to anonymous user {data['email']}")
# statslog data to register interaction
stats_data = {
'action_object': instance,
'user': request.user,
'anonymous': False,
'ip_address': client_ip,
'geo': g.geos(client_ip),
'contact': True,
'shop': instance.shop,
}
# create statslog instance to register interaction
StatsLog.objects.create(**stats_data)
return Response(data=data)
else:
return Response({"errors":{"details": f"No instance of company with id {kwargs['pk']}",}})
except Exception as e:
return Response({"errors":{"details": str(e),}}, status=500)
@action(detail=True, methods=['GET', ])
def import_products(self, request, **kwargs):
instance = self.queryset.filter(pk=kwargs['pk']).first() instance = self.queryset.filter(pk=kwargs['pk']).first()
# check if it's a shop if instance:
if instance.shop is not True: # IP stuff
return Response({'error': 'This company is not a shop'}) client_ip, is_routable = get_client_ip(request)
# check required credentials g = GeoIP2()
credentials = instance.credentials
if credentials is None or credentials == {}: # deserialize payload
return Response({'error': 'This company has no registered credentials'}) data = json.loads(request.body)
# check what platform if request.user.is_authenticated:
platform = instance.platform # send email to manager
if platform is None: company_message = render_to_string('company_contact.html', {
message = {'error': 'This company is not registered with any platforms'} 'company': instance,
elif platform == 'WOO_COMMERCE': 'email': request.user.email,
# recheck credentials 'full_name': request.user.full_name,
if 'key' in credentials.keys() and 'secret' in credentials.keys(): 'quantity': data['quantity'],
# execute import 'phone_number': data.get('phone_number'),
products = woocommerce.migrate_shop_products( 'comments': data['comments'],
instance.web_link, 'product_info': data['product_info'],
credentials['key'], })
credentials['secret'], user_message = render_to_string('confirm_company_contact.html', {
request.user, 'company': instance,
) 'username': request.user.full_name,
message = {'details': f'{len(products)} products added for {instance.company_name}'} 'data': data,
else: })
message = {"error": 'Credentials have wrong format'} # send email to company
subject = "Contacto de usuario"
email = EmailMessage(subject, company_message, to=[instance.creator.email])
email.send()
logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}")
# send confirmation email to user
subject = 'Confirmación de contacto'
email = EmailMessage(subject, message, to=[request.user.email])
email.send()
logging.info(f"Contact confirmation email sent to {request.user.email}")
stats_data = {
'action_object': instance,
'user': None,
'anonymous': True,
'ip_address': client_ip,
'geo': g.geos(client_ip),
'contact': True,
'shop': instance.shop,
}
else:
# for unauthenticated users
company_message = render_to_string('company_contact.html', {
'company': instance,
'email': data['email'],
'full_name': data['full_name'],
'quantity': data['quantity'],
'phone_number': data.get('phone_number'),
'comments': data['comments'],
'product_info': data['product_info'],
})
user_message = render_to_string('confirm_company_contact.html', {
'company': instance,
'username': data['full_name'],
})
# send email to company
email = EmailMessage(subject, company_message, to=[instance.creator.email])
email.send()
logging.info(f"Email sent to {instance.creator.email} as manager of {instance.name}")
# send confirmation email to user
email = EmailMessage(subject, user_message, to=[data['email']])
email.send()
logging.info(f"Contact confirmation email sent to anonymous user {data['email']}")
# statslog data to register interaction
stats_data = {
'action_object': instance,
'user': request.user,
'anonymous': False,
'ip_address': client_ip,
'geo': g.geos(client_ip),
'contact': True,
'shop': instance.shop,
}
# create statslog instance to register interaction
StatsLog.objects.create(**stats_data)
return Response(data=data)
else: else:
message = {'error': f'Platform {plaform} not registered'} return Response({"errors":{"details": f"No instance of company with id {kwargs['pk']}",}})
return Response(message) except Exception as e:
return Response({"errors":{"details": str(e),}}, status=500)
@action(detail=True, methods=['GET', ])
def import_products(self, request, **kwargs):
instance = self.queryset.filter(pk=kwargs['pk']).first()
# check if it's a shop
if instance.shop is not True:
return Response({'error': 'This company is not a shop'})
# check required credentials
credentials = instance.credentials
if credentials is None or credentials == {}:
return Response({'error': 'This company has no registered credentials'})
# check what platform
platform = instance.platform
if platform is None:
message = {'error': 'This company is not registered with any platforms'}
elif platform == 'WOO_COMMERCE':
# recheck credentials
if 'key' in credentials.keys() and 'secret' in credentials.keys():
# execute import
products = woocommerce.migrate_shop_products(
instance.web_link,
credentials['key'],
credentials['secret'],
request.user,
)
message = {'details': f'{len(products)} products added for {instance.company_name}'}
else:
message = {"error": 'Credentials have wrong format'}
else:
message = {'error': f'Platform {plaform} not registered'}
return Response(message)
@api_view(['GET',]) @api_view(['GET',])

View File

@@ -7,7 +7,7 @@ from factory.django import DjangoModelFactory
class StatsLogFactory(DjangoModelFactory): class StatsLogFactory(DjangoModelFactory):
model = None action_object = None
user = SubFactory('core.factories.CustomUserFactory') user = SubFactory('core.factories.CustomUserFactory')
anonymous = FuzzyChoice(choices=(True, False)) anonymous = FuzzyChoice(choices=(True, False))
ip_address = '127.0.0.1' ip_address = '127.0.0.1'
@@ -17,4 +17,4 @@ class StatsLogFactory(DjangoModelFactory):
class Meta: class Meta:
model = 'stats.StatsLog' model = 'stats.StatsLog'

View File

@@ -5,4 +5,4 @@ from stats.models import StatsLog
class StatsLogSerializer(serializers.ModelSerializer): class StatsLogSerializer(serializers.ModelSerializer):
class Meta: class Meta:
model = StatsLog model = StatsLog
exclude = ['created', 'updated', 'creator'] exclude = ['created', 'updated',]

View File

@@ -1,3 +1,329 @@
from django.test import TestCase import random
import string
from rest_framework.test import APITestCase
from rest_framework import status
from core.factories import CustomUserFactory
from core.utils import get_tokens_for_user
from .models import StatsLog
from .factories import StatsLogFactory
# Create your tests here. # Create your tests here.
class TrackUserViewTest(APITestCase):
"""ProductViewSet tests
"""
def setUp(self):
"""Tests setup
"""
self.endpoint = '/api/v1/stats/me/'
self.factory = StatsLogFactory
self.model = StatsLog
# create user
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = CustomUserFactory(email="test@mail.com", password=self.password, is_active=True)
# anon user
def test_anon_user_can_only_post(self):
"""Not logged-in user cannot create new instance
"""
# Query endpoint
response = self.client.get(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# Query endpoint
response = self.client.put(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# Query endpoint
response = self.client.delete(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# Query endpoint
response = self.client.post(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_anon_user_can_register_product_action(self):
"""Not logged-in user cannot modify existing instance
"""
# Create instance
instance = self.factory()
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.put(url, {}, format='json')
# Assert forbidden code
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_anon_user_can_register_company_action(self):
"""Not logged-in user cannot modify existing instance
"""
# Create instance
instance = self.factory()
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.put(url, {}, format='json')
# Assert forbidden code
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# authenticated user
def test_auth_user_can_only_post(self):
"""Regular logged-in user can list instance
"""
# Create instances
instances = [self.factory() for n in range(random.randint(1,5))]
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Request list
response = self.client.get(self.endpoint)
# Assert access is allowed
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Assert all instances are returned
self.assertEqual(len(instances), len(response.data))
def test_auth_user_can_register_product_action(self):
"""Regular logged-in user can list instance
"""
# Create instances
instance = self.factory()
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Request list
url = f"{self.endpoint}{instance.id}/"
response = self.client.get(url)
# Assert access is allowed
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = json.loads(response.content)
self.assertEquals(instance.id, data['id'])
def test_auth_user_can_register_company_action(self):
"""Regular logged-in user can create new instance
"""
# Define request data
company = CompanyFactory()
data = {
'company': company.id,
'sku': 'qwerewq',
'name': 'qwerewq',
'description': 'qwerewq',
'url': 'http://qwerewq.com',
'price': '12.21',
'shipping_cost': '21.12',
'shipping_terms': 'qwerewq',
'source': 'SYNCHRONIZED',
'sourcing_date': datetime.datetime.now().isoformat()+'Z',
'update_date': datetime.datetime.now().isoformat()+'Z',
'discount': '0.05',
'stock': 22,
'tags': ['tag1, tag2'],
'category': 'Mr',
'attributes': ['color/red', 'size/xxl'],
'identifiers': '34rf34f43c43',
}
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
response = self.client.post(self.endpoint, data=data, format='json')
# Assert endpoint returns created status
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# Assert instance exists on db
self.assertTrue(self.model.objects.get(id=response.data['id']))
class StatsLogViewSetTest(APITestCase):
"""ProductViewSet tests
"""
def setUp(self):
"""Tests setup
"""
self.endpoint = '/api/v1/stats/'
self.factory = StatsLogFactory
self.model = StatsLog
# create user
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = CustomUserFactory(email="test@mail.com", password=self.password, is_active=True)
self.instance_data = {
'action_object': None,
'user': self.user.id,
'anonymous': False,
'ip_address': None,
'geo': None,
'contact': None,
'shop': True,
}
self.alt_data = {
'action_object': None,
'user': self.user.id,
'anonymous': True,
'ip_address': None,
'geo': None,
'contact': None,
'shop': False,
}
# anonymous user
def test_anon_user_cannot_crud(self):
"""Not logged-in user cannot access endpoint at all
"""
# Query endpoint
response = self.client.get(self.endpoint)
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# Query endpoint
response = self.client.post(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# Query endpoint
response = self.client.put(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# Query endpoint
response = self.client.delete(self.endpoint)
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# authenticated user
def test_auth_user_cannot_crud(self):
"""Authenticated user cannot access endpoint at all
"""
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
response = self.client.get(self.endpoint)
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# Query endpoint
response = self.client.post(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# Query endpoint
response = self.client.put(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# Query endpoint
response = self.client.delete(self.endpoint)
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# admin user
def test_admin_user_can_list_instance(self):
"""Admin user can list instance
"""
# make user be admin
self.user.is_staff = True
self.user.save()
# Create instances
instances = [self.factory() for n in range(random.randint(1,5))]
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Request list
response = self.client.get(self.endpoint)
# Assert access is allowed
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Assert all instances are returned
self.assertEqual(len(instances), len(response.data))
def test_admin_user_can_create_instance(self):
"""Admin user can create new instance
"""
# make user be admin
self.user.is_staff = True
self.user.save()
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
response = self.client.post(self.endpoint, data=self.instance_data, format='json')
# Assert endpoint returns created status
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# Assert instance exists on db
self.assertTrue(self.model.objects.get(id=response.data['id']))
def test_admin_user_can_modify_instance(self):
"""Admin user can modify existing instance
"""
# make user be admin
self.user.is_staff = True
self.user.save()
# Create instances
instance = self.factory()
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.put(url, self.alt_data, format='json')
# Assert endpoint returns OK code
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_admin_user_can_delete_instance(self):
"""Admin user can delete existing instance
"""
# make user be admin
self.user.is_staff = True
self.user.save()
# Create instances
instance = self.factory()
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.delete(url)
# assert 204 no content
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
# Assert instance doesn't exists anymore on db
self.assertFalse(self.model.objects.filter(id=instance.pk).exists())

View File

@@ -1,19 +1,70 @@
from django.shortcuts import render import json
import logging
# Create your views here. # Create your views here.
from rest_framework import viewsets from rest_framework import viewsets
from stats.models import StatsLog from rest_framework.decorators import api_view
from stats.serializers import StatsLogSerializer from rest_framework.response import Response
from rest_framework import status
from ipware import get_client_ip
from django.contrib.gis.geoip2 import GeoIP2
from back_latienda.permissions import IsStaff from back_latienda.permissions import IsStaff
from products.models import Product
from companies.models import Company
from stats.models import StatsLog
from stats.serializers import StatsLogSerializer
class StatsLogViewSet(viewsets.ModelViewSet): class StatsLogViewSet(viewsets.ModelViewSet):
queryset = StatsLog.objects.all() queryset = StatsLog.objects.all()
serializer_class = StatsLogSerializer serializer_class = StatsLogSerializer
permission_classes = [IsStaff,] permission_classes = [IsStaff,]
def perform_create(self, serializer):
serializer.save(creator=self.request.user)
@api_view(['POST'])
def track_user(request):
"""Track user actions on the site
Params:
{
action: view,
object: {
model: name,
id: 1,
},
}
"""
try:
data = json.loads(request.body)
# geoip stuff
client_ip, is_routable = get_client_ip(request)
g = GeoIP2()
# gather instance data
instance_data = {
'action': data.get('action'),
'user': request.user,
'anonymous': request.user.is_anonymous,
'ip_address': client_ip,
'geo': g.geos(client_ip),
# 'contact' ???
}
if data['object'].get('name') == 'product':
instance_data['action_object'] = Product.objects.get(id=data['object'].get('id'))
elif data['object'].get('name') == 'company':
instance_data['action_object'] = Company.objects.get(id=data['object'].get('id'))
if instance_data['action_object'].shop is True:
instance_data['shop'] = True
# crate new instance
new_stat = StatsLog.objects.create(**instance_data)
return Response(status=status.HTTP_201_CREATED)
except Exception as e:
logging.error(f"Stats could not be created: {str(e)}")
return Response(f"Process could not be registered: {str(type(e))}")