import random import string import json import hashlib import base64 import csv from django.test import TestCase from django.core import mail from django.utils.http import urlsafe_base64_encode from django.utils.encoding import force_bytes from rest_framework.test import APITestCase from rest_framework import status from core.utils import get_tokens_for_user, account_activation_token from companies.models import Company from . import models from . import factories # Create your tests here. class CustomUserViewSetTest(APITestCase): """CustomUser viewset tests """ def setUp(self): """Tests setup """ self.endpoint = '/api/v1/users/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create admin user self.admin_email = f"admin_user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.admin_email, password=self.password, is_active=True) # create regular user self.reg_email = f"regular_user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.reg_email, password=self.password, is_active=True) # anon user def test_anon_user_can_create_active_instance(self): """Not logged-in user can create new instance of User but it's inactive """ data = { 'email': 'test@email.com', 'full_name': 'TEST NAME', 'password': 'VENTILADORES1234499.89', } # Query endpoint response = self.client.post(self.endpoint, data=data) # Assert creation is successful self.assertEqual(response.status_code, status.HTTP_201_CREATED) # assert instance is inactive info = json.loads(response.content) self.assertTrue(info['is_active']) # Assert instance exists on db self.assertTrue(self.model.objects.get(email=info['email'])) # assert verification email self.assertTrue(len(mail.outbox) == 1) def test_anon_user_cannot_modify_existing_instance(self): """Not logged-in user cannot modify existing instance """ # Create instance instance = self.factory() # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.put(url, {}, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) def test_anon_user_cannot_delete_existing_instance(self): """Not logged-in user cannot delete existing instance """ # Create instances instance = self.factory() # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) # Assert instance still exists on db self.assertTrue(self.model.objects.get(id=instance.pk)) def test_anon_user_cannot_list_instances(self): """Not logged-in user can't read instance """ # Request list response = self.client.get(self.endpoint) # Assert access is forbidden self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) # auth regular user def test_regular_user_cannot_create_instance(self): """Regular user cannot create new instance """ # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint response = self.client.post(self.endpoint, data={}) # Assert access is forbidden self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_regular_user_cannot_modify_existing_instance(self): """Regular user cannot modify existing instance """ # Create instance instance = self.factory() # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.put(url, {}, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_regular_user_cannot_delete_existing_instance(self): """Regular user cannot delete existing instance """ # Create instances instance = self.factory() # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) # Assert instance still exists on db self.assertTrue(self.model.objects.get(id=instance.pk)) def test_regular_user_cannot_list_instances(self): """Regular user can't list instances """ # Request list response = self.client.get(self.endpoint) # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Assert access is forbidden self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) # admin user def test_admin_user_can_create_instance(self): """Admin user can create new instance """ # set user as admin self.user.is_staff = True self.user.save() # Define request data data = { 'email': 'test@email.com', 'full_name': 'TEST NAME', 'password': 'VENTILADORES1234499.89', } # Authenticate user token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint response = self.client.post(self.endpoint, data=data, format='json') # Assert endpoint returns created status self.assertEqual(response.status_code, status.HTTP_201_CREATED) # Assert instance exists on db self.assertTrue(self.model.objects.get(email=response.data['email'])) # assert verification email self.assertTrue(len(mail.outbox) == 1) def test_admin_user_can_modify_existing_instance(self): """Admin user can modify existing instance """ # set user as admin self.user.is_staff = True self.user.save() # Create instances instance = self.factory() # Define request data data = { 'email': 'test_alt@email.com', 'full_name': 'TEST NAME ALT', 'notify': True, } # Authenticate user token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.put(url, data, format='json') # Assert endpoint returns OK code self.assertEqual(response.status_code, status.HTTP_200_OK) # Assert instance has been modified for key in data: self.assertEqual(data[key], response.data[key]) def test_admin_user_can_delete_existing_instance(self): """Admin user can delete existing instance """ # set user as admin self.user.is_staff = True self.user.save() # Create instances instance = self.factory() # Authenticate user token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.delete(url) # assert 204 no content self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) # Assert instance doesn't exists anymore on db self.assertFalse(self.model.objects.filter(id=instance.pk).exists()) class ChangeUserPasswordViewTest(APITestCase): def setUp(self): """Tests setup """ self.endpoint = '/api/v1/user/update/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create regular user self.reg_email = f"user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.reg_email, is_active=True) self.user.set_password(self.password) self.user.save() def test_fail_different_passwords(self): # Create instance data = { "old_password": self.password, "password": "!!!", "password2": "SUPERSECRETNEWPASSWORD", } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/change_password/{self.user.pk}/' response = self.client.put(url, data=data, format='json') self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) def test_fail_wrong_old_password(self): # Create instance data = { "old_password": 'WRONG!!!!', "password": 'SUPERSECRETNEWPASSWORD', "password2": "SUPERSECRETNEWPASSWORD", } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/change_password/{self.user.pk}/' response = self.client.put(url, data=data, format='json') self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) def test_auth_user_can_modify_own_password(self): """authenticated user can modify own password """ data = { "old_password": self.password, "password": "SUPERSECRETNEWPASSWORD", "password2": "SUPERSECRETNEWPASSWORD", } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/change_password/{self.user.pk}/' response = self.client.put(url, data=data, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_200_OK) # assert new password hash properly updated # assert fields exist, and data matches updated_user = self.model.objects.get(pk=self.user.id) stored_value = updated_user.__dict__['password'] hash_type, iteration, salt, stored_password_hash = stored_value.split('$') new_password_hash = hashlib.pbkdf2_hmac( hash_name='sha256', password=data['password'].encode(), salt=salt.encode(), iterations=int(iteration), ) self.assertEqual(stored_password_hash, base64.b64encode(new_password_hash).decode()) class UpdateUserViewTest(APITestCase): def setUp(self): """Tests setup """ self.endpoint = '/api/v1/users/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create regular user self.reg_email = f"user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.reg_email, is_active=True) self.user.set_password(self.password) self.user.save() # create admin user self.admin_email = f"admin_user@mail.com" self.admin_user = self.factory(email=self.admin_email, is_staff=True, is_active=True) self.admin_user.set_password(self.password) self.admin_user.save() def test_auth_user_can_modify_own_instance(self): """Regular user can modify own instance """ # Create instance data = { "email": "new_email@mail.com", "full_name": "New Full Name", 'provider': 'PROVIDER', 'notify': True, } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'{self.endpoint}{self.user.pk}/' response = self.client.put(url, data=data, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_200_OK) def test_auth_user_cannot_modify_random_instance(self): """Regular user cannot modify randnom instance """ # Create instance instance = self.factory() # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'{self.endpoint}{instance.pk}/' response = self.client.put(url, data={}, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_anon_user_cannot_modify_random_instance(self): """anon user cannot modify instance """ # Create instance instance = self.factory() # Query endpoint url = f'{self.endpoint}{instance.pk}/' response = self.client.put(url, data={}, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) def test_admin_user_can_modify_random_instance(self): """Regular user cannot modify randnom instance """ # Create instance instance = self.factory() # Authenticate token = get_tokens_for_user(self.admin_user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") data = { "email": "new_email@mail.com", "full_name": "New Full Name", 'provider': 'PROVIDER', 'notify': True, } # Query endpoint url = f'{self.endpoint}{instance.pk}/' response = self.client.put(url, data=data, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_200_OK) class LoadCoopManagerTestCase(APITestCase): def setUp(self): """Tests setup """ self.endpoint = '/api/v1/load_coops/' self.user_factory = factories.CustomUserFactory self.user_model = models.CustomUser self.company_model = Company # create admin user self.admin_email = f"admin_user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.admin_user = self.user_factory(email=self.admin_email, password=self.password, is_staff=True, is_active=True) # create regular user self.reg_email = f"user@mail.com" self.user = self.user_factory(email=self.reg_email, is_active=True) self.user.set_password(self.password) self.user.save() # test CSV file path self.csv_path = 'datasets/test_coop.csv' def test_admin_can_load_csv(self): # count existing instances company_count = self.company_model.objects.count() user_count = self.user_model.objects.count() # read csv file files = {'csv_file': open(self.csv_path,'rt')} # Authenticate token = get_tokens_for_user(self.admin_user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # send in request response = self.client.post(self.endpoint, files) # check response self.assertEqual(response.status_code, 200) # check for object creation self.assertEquals(company_count + 5, self.company_model.objects.count()) self.assertEquals(user_count + 5, self.user_model.objects.count()) self.assertEquals(5, len(mail.outbox)) def test_auth_user_cannot_load_csv(self): company_count = self.company_model.objects.count() user_count = self.user_model.objects.count() # read csv file files = {'csv_file': open(self.csv_path,'r')} # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # send in request response = self.client.post(self.endpoint, files) # check response self.assertEqual(response.status_code, 403) # check for object creation self.assertEqual(company_count, self.company_model.objects.count()) self.assertEqual(user_count, self.user_model.objects.count()) def test_anon_user_cannot_load_csv(self): company_count = self.company_model.objects.count() user_count = self.user_model.objects.count() # read csv file files = {'csv_file': open(self.csv_path,'r')} # send in request response = self.client.post(self.endpoint, files) # check response self.assertEqual(response.status_code, 401) # check for object creation self.assertEqual(company_count, self.company_model.objects.count()) self.assertEqual(user_count, self.user_model.objects.count()) class MyUserViewTest(APITestCase): """my_user tests """ def setUp(self): """Tests setup """ self.endpoint = '/api/v1/my_user/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create user self.email = f"user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.email, is_active=True) self.user.set_password(self.password) self.user.save() def test_auth_user_gets_data(self): # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint response = self.client.get(self.endpoint) payload = response.json() # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEquals(self.email, payload['email']) def test_anon_user_cannot_access(self): # send in request response = self.client.get(self.endpoint) # check response self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) class ActivateUserTest(APITestCase): def setUp(self): self.endpoint = 'activate///' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create user self.email = f"user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.email, is_active=False) self.user.set_password(self.password) self.user.save() def test_correct_activation(self): # create values uid = urlsafe_base64_encode(force_bytes(self.user.pk)) token = account_activation_token.make_token(self.user) url = f'/activate/{uid}/{token}/' response = self.client.get(url) # assertions self.assertEquals(response.status_code, 200) self.assertTrue(self.user.email in str(response.content)) def test_bad_activation(self): # create values uid = urlsafe_base64_encode(force_bytes(self.user.pk))[:-1] token = account_activation_token.make_token(self.user)[:-1] url = f'/activate/{uid}/{token}/' response = self.client.get(url) # assertions self.assertEquals(response.status_code, 406) self.assertTrue('error' in response.json()) class CreateCompanyUserTest(APITestCase): def setUp(self): self.endpoint = '/api/v1/create_company_user/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create user self.email = f"user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.email, is_active=False) self.user.set_password(self.password) self.user.save() def test_succesful_creation(self): data = { 'user': { 'email': 'test@email.com', 'full_name': 'TEST NAME', 'password': 'VENTILADORES1234499.89', }, 'company': { 'cif': 'qwerewq', 'company_name': 'qwerewq', 'short_name': 'qwerewq', 'web_link': 'http://qwerewq.com', 'shop': True, 'shop_link': 'http://qwerewq.com', 'platform': 'PRESTASHOP', 'email': 'test@email.com', 'logo': None, 'city': None, 'address': 'qwer qewr 5', 'geo': {'longitude': 1.0, 'latitude': 1.0}, 'phone': '1234', 'mobile': '4321', 'other_phone': '41423', 'description': 'dfgfdgdfg', 'shop_rss_feed': 'http://qwerewq.com', 'sale_terms': 'tewrnmfew f ewfrfew ewewew f', 'shipping_cost': '12.25', 'sync': False } } response = self.client.post(self.endpoint, data=data, format='json') import ipdb; ipdb.set_trace() self.assertEquals(response.status_code, 201) self.assertEquals(len(mail.outbox), 1) def test_creation_error(self): response = self.client.post(self.endpoint, data={}, format='json') self.assertEquals(response.status_code, 406) self.assertEquals(len(mail.outbox), 0)