Files
consumocuidado-server/core/tests.py
2021-03-12 15:03:50 +01:00

594 lines
20 KiB
Python

import random
import string
import json
import hashlib
import base64
import csv
from django.test import TestCase
from django.core import mail
from django.utils.http import urlsafe_base64_encode
from django.utils.encoding import force_bytes
from django.conf import settings
from rest_framework.test import APITestCase
from rest_framework import status
from core.utils import get_tokens_for_user, account_activation_token
from companies.models import Company
from . import models
from . import factories
# Create your tests here.
class CustomUserViewSetTest(APITestCase):
"""CustomUser viewset tests
"""
def setUp(self):
"""Tests setup
"""
self.endpoint = '/api/v1/users/'
self.factory = factories.CustomUserFactory
self.model = models.CustomUser
# create admin user
self.admin_email = f"admin_user@mail.com"
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = self.factory(email=self.admin_email, password=self.password, is_active=True)
# create regular user
self.reg_email = f"regular_user@mail.com"
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = self.factory(email=self.reg_email, password=self.password, is_active=True)
# anon user
def test_anon_user_can_create_inactive_instance(self):
"""Not logged-in user can create new instance of User but it's inactive
"""
data = {
'email': 'test@email.com',
'full_name': 'TEST NAME',
'password': 'VENTILADORES1234499.89',
}
# Query endpoint
response = self.client.post(self.endpoint, data=data)
# Assert creation is successful
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# assert instance is inactive
info = json.loads(response.content)
self.assertFalse(info['is_active'])
# Assert instance exists on db
self.assertTrue(self.model.objects.get(email=info['email']))
# assert verification email
self.assertTrue(len(mail.outbox) == 1)
def test_anon_user_cannot_modify_existing_instance(self):
"""Not logged-in user cannot modify existing instance
"""
# Create instance
instance = self.factory()
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.put(url, {}, format='json')
# Assert forbidden code
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_anon_user_cannot_delete_existing_instance(self):
"""Not logged-in user cannot delete existing instance
"""
# Create instances
instance = self.factory()
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.delete(url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# Assert instance still exists on db
self.assertTrue(self.model.objects.get(id=instance.pk))
def test_anon_user_cannot_list_instances(self):
"""Not logged-in user can't read instance
"""
# Request list
response = self.client.get(self.endpoint)
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
# auth regular user
def test_regular_user_cannot_create_instance(self):
"""Regular user cannot create new instance
"""
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
response = self.client.post(self.endpoint, data={})
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_regular_user_cannot_modify_existing_instance(self):
"""Regular user cannot modify existing instance
"""
# Create instance
instance = self.factory()
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.put(url, {}, format='json')
# Assert forbidden code
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_regular_user_cannot_delete_existing_instance(self):
"""Regular user cannot delete existing instance
"""
# Create instances
instance = self.factory()
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.delete(url)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# Assert instance still exists on db
self.assertTrue(self.model.objects.get(id=instance.pk))
def test_regular_user_cannot_list_instances(self):
"""Regular user can't list instances
"""
# Request list
response = self.client.get(self.endpoint)
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Assert access is forbidden
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_auth_user_can_modify_own_instance(self):
"""Regular user can modify own instance
"""
# Create instance
data = {
"email": "new_email@mail.com",
"full_name": "New Full Name",
'notify': True,
}
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = f'{self.endpoint}{self.user.pk}/'
response = self.client.put(url, data=data, format='json')
# Assert forbidden code
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Assert instance has been modified
for key in data:
self.assertEqual(data[key], response.data[key])
# admin user
def test_admin_user_can_create_instance(self):
"""Admin user can create new instance
"""
# set user as admin
self.user.is_staff = True
self.user.save()
# Define request data
data = {
'email': 'test@email.com',
'full_name': 'TEST NAME',
'password': 'VENTILADORES1234499.89',
}
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
response = self.client.post(self.endpoint, data=data, format='json')
# Assert endpoint returns created status
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# Assert instance exists on db
self.assertTrue(self.model.objects.get(email=response.data['email']))
# assert verification email
self.assertTrue(len(mail.outbox) == 1)
def test_admin_user_can_modify_existing_instance(self):
"""Admin user can modify existing instance
"""
# set user as admin
self.user.is_staff = True
self.user.save()
# Create instances
instance = self.factory()
# Define request data
data = {
'email': 'test_alt@email.com',
'full_name': 'TEST NAME ALT',
'notify': True,
}
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.put(url, data, format='json')
# Assert endpoint returns OK code
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Assert instance has been modified
for key in data:
self.assertEqual(data[key], response.data[key])
def test_admin_user_can_delete_existing_instance(self):
"""Admin user can delete existing instance
"""
# set user as admin
self.user.is_staff = True
self.user.save()
# Create instances
instance = self.factory()
# Authenticate user
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = self.endpoint + f'{instance.pk}/'
response = self.client.delete(url)
# assert 204 no content
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
# Assert instance doesn't exists anymore on db
self.assertFalse(self.model.objects.filter(id=instance.pk).exists())
class ChangeUserPasswordViewTest(APITestCase):
def setUp(self):
"""Tests setup
"""
self.endpoint = '/api/v1/user/update/'
self.factory = factories.CustomUserFactory
self.model = models.CustomUser
# create regular user
self.reg_email = f"user@mail.com"
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = self.factory(email=self.reg_email, is_active=True)
self.user.set_password(self.password)
self.user.save()
def test_fail_different_passwords(self):
# Create instance
data = {
"old_password": self.password,
"password": "!!!",
"password2": "SUPERSECRETNEWPASSWORD",
}
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = f'/api/v1/user/change_password/{self.user.pk}/'
response = self.client.put(url, data=data, format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_fail_wrong_old_password(self):
# Create instance
data = {
"old_password": 'WRONG!!!!',
"password": 'SUPERSECRETNEWPASSWORD',
"password2": "SUPERSECRETNEWPASSWORD",
}
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = f'/api/v1/user/change_password/{self.user.pk}/'
response = self.client.put(url, data=data, format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_auth_user_can_modify_own_password(self):
"""authenticated user can modify own password
"""
data = {
"old_password": self.password,
"password": "SUPERSECRETNEWPASSWORD",
"password2": "SUPERSECRETNEWPASSWORD",
}
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
url = f'/api/v1/user/change_password/{self.user.pk}/'
response = self.client.put(url, data=data, format='json')
# Assert forbidden code
self.assertEqual(response.status_code, status.HTTP_200_OK)
# assert new password hash properly updated
# assert fields exist, and data matches
updated_user = self.model.objects.get(pk=self.user.id)
stored_value = updated_user.__dict__['password']
hash_type, iteration, salt, stored_password_hash = stored_value.split('$')
new_password_hash = hashlib.pbkdf2_hmac(
hash_name='sha256',
password=data['password'].encode(),
salt=salt.encode(),
iterations=int(iteration),
)
self.assertEqual(stored_password_hash, base64.b64encode(new_password_hash).decode())
class LoadCoopManagerTestCase(APITestCase):
def setUp(self):
"""Tests setup
"""
self.endpoint = '/api/v1/load_coops/'
self.user_factory = factories.CustomUserFactory
self.user_model = models.CustomUser
self.company_model = Company
# create admin user
self.admin_email = f"admin_user@mail.com"
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.admin_user = self.user_factory(email=self.admin_email, password=self.password, is_staff=True, is_active=True)
# create regular user
self.reg_email = f"user@mail.com"
self.user = self.user_factory(email=self.reg_email, is_active=True)
self.user.set_password(self.password)
self.user.save()
# test CSV file path
self.csv_path = 'datasets/test_coop.csv'
def test_admin_can_load_csv(self):
# count existing instances
company_count = self.company_model.objects.count()
user_count = self.user_model.objects.count()
# read csv file
files = {'csv_file': open(self.csv_path,'rt')}
# Authenticate
token = get_tokens_for_user(self.admin_user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# send in request
response = self.client.post(self.endpoint, files)
# check response
self.assertEqual(response.status_code, 200)
# check for object creation
self.assertEquals(company_count + 5, self.company_model.objects.count())
self.assertEquals(user_count + 5, self.user_model.objects.count())
self.assertEquals(5, len(mail.outbox))
def test_auth_user_cannot_load_csv(self):
company_count = self.company_model.objects.count()
user_count = self.user_model.objects.count()
# read csv file
files = {'csv_file': open(self.csv_path,'r')}
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# send in request
response = self.client.post(self.endpoint, files)
# check response
self.assertEqual(response.status_code, 403)
# check for object creation
self.assertEqual(company_count, self.company_model.objects.count())
self.assertEqual(user_count, self.user_model.objects.count())
def test_anon_user_cannot_load_csv(self):
company_count = self.company_model.objects.count()
user_count = self.user_model.objects.count()
# read csv file
files = {'csv_file': open(self.csv_path,'r')}
# send in request
response = self.client.post(self.endpoint, files)
# check response
self.assertEqual(response.status_code, 401)
# check for object creation
self.assertEqual(company_count, self.company_model.objects.count())
self.assertEqual(user_count, self.user_model.objects.count())
class MyUserViewTest(APITestCase):
"""my_user tests
"""
def setUp(self):
"""Tests setup
"""
self.endpoint = '/api/v1/my_user/'
self.factory = factories.CustomUserFactory
self.model = models.CustomUser
# create user
self.email = f"user@mail.com"
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = self.factory(email=self.email, is_active=True)
self.user.set_password(self.password)
self.user.save()
def test_auth_user_gets_data(self):
# Authenticate
token = get_tokens_for_user(self.user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
# Query endpoint
response = self.client.get(self.endpoint)
payload = response.json()
# Assert forbidden code
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEquals(self.email, payload['email'])
def test_anon_user_cannot_access(self):
# send in request
response = self.client.get(self.endpoint)
# check response
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class ActivateUserTest(APITestCase):
def setUp(self):
self.endpoint = 'activate/<uidb64>/<token>/'
self.factory = factories.CustomUserFactory
self.model = models.CustomUser
# create user
self.email = f"user@mail.com"
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = self.factory(email=self.email, is_active=False)
self.user.set_password(self.password)
self.user.save()
def test_correct_activation(self):
# create values
uid = urlsafe_base64_encode(force_bytes(self.user.pk))
token = account_activation_token.make_token(self.user)
url = f'/activate/{uid}/{token}/'
response = self.client.get(url)
# assertions
self.assertEquals(response.status_code, 302)
self.assertEquals(response.url, settings.ACTIVATION_REDIRECT)
def test_correct_activation_no_redirect(self):
# set ACTIVATION_REDIRECT to ''
settings.ACTIVATION_REDIRECT = ''
# create values
uid = urlsafe_base64_encode(force_bytes(self.user.pk))
token = account_activation_token.make_token(self.user)
url = f'/activate/{uid}/{token}/'
response = self.client.get(url)
# assertions
self.assertEquals(response.status_code, 200)
self.assertTrue(self.user.email in str(response.content))
def test_bad_activation(self):
# create values
uid = urlsafe_base64_encode(force_bytes(self.user.pk))[:-1]
token = account_activation_token.make_token(self.user)[:-1]
url = f'/activate/{uid}/{token}/'
response = self.client.get(url)
# assertions
self.assertEquals(response.status_code, 406)
self.assertTrue('error' in response.json())
class CreateCompanyUserTest(APITestCase):
def setUp(self):
self.endpoint = '/api/v1/create_company_user/'
self.factory = factories.CustomUserFactory
self.model = models.CustomUser
# create user
self.email = "user@mail.com"
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
self.user = self.factory(email=self.email, is_active=False)
self.user.set_password(self.password)
self.user.save()
def test_succesful_creation(self):
data = {
'user': {
'email': 'test@email.com',
'full_name': 'TEST NAME',
'password': 'VENTILADORES1234499.89',
},
'company': {
'cif': 'qwerewq',
'company_name': 'qwerewq',
'short_name': 'qwerewq',
'web_link': 'http://qwerewq.com',
'shop': True,
'shop_link': 'http://qwerewq.com',
'platform': 'PRESTASHOP',
'email': 'test@email.com',
'logo': None,
'city': None,
'address': 'qwer qewr 5',
'geo': {'longitude': 1.0, 'latitude': 1.0},
'phone': '1234',
'mobile': '4321',
'other_phone': '41423',
'description': 'dfgfdgdfg',
'shop_rss_feed': 'http://qwerewq.com',
'sale_terms': 'tewrnmfew f ewfrfew ewewew f',
'shipping_cost': '12.25',
'sync': False
}
}
response = self.client.post(self.endpoint, data=data, format='json')
self.assertEquals(response.status_code, 201)
self.assertEquals(len(mail.outbox), 1)
# user exists and it's inactice
self.assertTrue(self.model.objects.get(email='test@email.com'))
self.assertFalse(self.model.objects.get(email='test@email.com').is_active)
self.assertTrue(Company.objects.get(cif='qwerewq'))
# assert verification email
self.assertTrue(len(mail.outbox) == 1)
def test_creation_error(self):
response = self.client.post(self.endpoint, data={}, format='json')
self.assertEquals(response.status_code, 406)
self.assertEquals(len(mail.outbox), 0)