import random import string import json import hashlib import base64 import csv from django.test import TestCase from rest_framework.test import APITestCase from rest_framework import status import requests from core.utils import get_tokens_for_user from companies.models import Company from . import models from . import factories # Create your tests here. class CustomUserViewSetTest(APITestCase): """CustomUser viewset tests """ def setUp(self): """Tests setup """ self.endpoint = '/api/v1/users/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create admin user self.admin_email = f"admin_user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.admin_email, password=self.password, is_active=True) # create regular user self.reg_email = f"regular_user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.reg_email, password=self.password, is_active=True) # anon user def test_anon_user_can_create_inactive_instance(self): """Not logged-in user can create new instance of User but it's inactive TODO: should create inactive user """ data = { 'email': 'test@email.com', 'full_name': 'TEST NAME', 'password': 'VENTILADORES1234499.89', } # Query endpoint response = self.client.post(self.endpoint, data=data) # Assert creation is successful self.assertEqual(response.status_code, status.HTTP_201_CREATED) # assert instance is inactive info = json.loads(response.content) self.assertFalse(info['is_active']) def test_anon_user_cannot_modify_existing_instance(self): """Not logged-in user cannot modify existing instance """ # Create instance instance = self.factory() # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.put(url, {}, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) def test_anon_user_cannot_delete_existing_instance(self): """Not logged-in user cannot delete existing instance """ # Create instances instance = self.factory() # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) # Assert instance still exists on db self.assertTrue(self.model.objects.get(id=instance.pk)) def test_anon_user_cannot_list_instances(self): """Not logged-in user can't read instance """ # Request list response = self.client.get(self.endpoint) # Assert access is forbidden self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) # auth regular user def test_regular_user_cannot_create_instance(self): """Regular user cannot create new instance """ # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint response = self.client.post(self.endpoint, data={}) # Assert access is forbidden self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_regular_user_cannot_modify_existing_instance(self): """Regular user cannot modify existing instance """ # Create instance instance = self.factory() # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.put(url, {}, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) def test_regular_user_cannot_delete_existing_instance(self): """Regular user cannot delete existing instance """ # Create instances instance = self.factory() # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) # Assert instance still exists on db self.assertTrue(self.model.objects.get(id=instance.pk)) def test_regular_user_cannot_list_instances(self): """Regular user can't list instances """ # Request list response = self.client.get(self.endpoint) # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Assert access is forbidden self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) # admin user def test_admin_user_can_create_instance(self): """Admin user can create new instance """ # set user as admin self.user.is_staff = True self.user.save() # Define request data data = { 'email': 'test@email.com', 'full_name': 'TEST NAME', 'password': 'VENTILADORES1234499.89', } # Authenticate user token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint response = self.client.post(self.endpoint, data=data, format='json') # Assert endpoint returns created status self.assertEqual(response.status_code, status.HTTP_201_CREATED) # Assert instance exists on db self.assertTrue(self.model.objects.get(email=response.data['email'])) def test_admin_user_can_modify_existing_instance(self): """Admin user can modify existing instance """ # set user as admin self.user.is_staff = True self.user.save() # Create instances instance = self.factory() # Define request data data = { 'email': 'test_alt@email.com', 'full_name': 'TEST NAME ALT' } # Authenticate user token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.put(url, data, format='json') # Assert endpoint returns OK code self.assertEqual(response.status_code, status.HTTP_200_OK) # Assert instance has been modified for key in data: self.assertEqual(data[key], response.data[key]) def test_admin_user_can_delete_existing_instance(self): """Admin user can delete existing instance """ # set user as admin self.user.is_staff = True self.user.save() # Create instances instance = self.factory() # Authenticate user token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = self.endpoint + f'{instance.pk}/' response = self.client.delete(url) # assert 204 no content self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) # Assert instance doesn't exists anymore on db self.assertFalse(self.model.objects.filter(id=instance.pk).exists()) class ChangeUserPasswordViewTest(APITestCase): def setUp(self): """Tests setup """ self.endpoint = '/api/v1/user/update/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create regular user self.reg_email = f"user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.reg_email, is_active=True) self.user.set_password(self.password) self.user.save() def test_fail_different_passwords(self): # Create instance data = { "old_password": self.password, "password": "!!!", "password2": "SUPERSECRETNEWPASSWORD", } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/change_password/{self.user.pk}/' response = self.client.put(url, data=data, format='json') self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) def test_fail_wrong_old_password(self): # Create instance data = { "old_password": 'WRONG!!!!', "password": 'SUPERSECRETNEWPASSWORD', "password2": "SUPERSECRETNEWPASSWORD", } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/change_password/{self.user.pk}/' response = self.client.put(url, data=data, format='json') self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) def test_auth_user_can_modify_own_password(self): """authenticated user can modify own password """ data = { "old_password": self.password, "password": "SUPERSECRETNEWPASSWORD", "password2": "SUPERSECRETNEWPASSWORD", } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/change_password/{self.user.pk}/' response = self.client.put(url, data=data, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_200_OK) # assert new password hash properly updated # assert fields exist, and data matches updated_user = self.model.objects.get(pk=self.user.id) stored_value = updated_user.__dict__['password'] hash_type, iteration, salt, stored_password_hash = stored_value.split('$') new_password_hash = hashlib.pbkdf2_hmac( hash_name='sha256', password=data['password'].encode(), salt=salt.encode(), iterations=int(iteration), ) self.assertEqual(stored_password_hash, base64.b64encode(new_password_hash).decode()) class UpdateUserViewTest(APITestCase): def setUp(self): """Tests setup """ self.endpoint = '/api/v1/user/change_password/' self.factory = factories.CustomUserFactory self.model = models.CustomUser # create regular user self.reg_email = f"user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.user = self.factory(email=self.reg_email, is_active=True) self.user.set_password(self.password) self.user.save() def test_auth_user_can_modify_own_instance(self): """Regular user can modify own instance """ # Create instance data = { "email": "new_email@mail.com", "full_name": "New Full Name", 'provider': 'PROVIDER', } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/update/{self.user.pk}/' response = self.client.put(url, data=data, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_200_OK) def test_auth_user_cannot_modify_random_instance(self): """Regular user can modify own instance """ # Create instance instance = self.factory() data = { "email": "new_email@mail.com", "full_name": "New Full Name", 'provider': 'PROVIDER', } # Authenticate token = get_tokens_for_user(self.user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}") # Query endpoint url = f'/api/v1/user/update/{instance.pk}/' response = self.client.put(url, data=data, format='json') # Assert forbidden code self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) class LoadCoopManagerTestCase(TestCase): def setUp(self): """Tests setup """ self.endpoint = 'http://127.0.0.1:8000/api/v1/load_coops/' self.user_factory = factories.CustomUserFactory self.user_model = models.CustomUser self.company_model = Company # create admin user self.admin_email = f"admin_user@mail.com" self.password = ''.join(random.choices(string.ascii_uppercase, k = 10)) self.admin_user = self.user_factory(email=self.admin_email, password=self.password, is_staff=True, is_active=True) # create regular user self.reg_email = f"user@mail.com" self.user = self.user_factory(email=self.reg_email, is_active=True) self.user.set_password(self.password) self.user.save() def test_admin_can_load_csv(self): company_count = self.company_model.objects.count() user_count = self.user_model.objects.count() # read csv file csv_file = '../coop.csv' files = {'csv_file': open(csv_file,'rt')} # Authenticate token = get_tokens_for_user(self.admin_user) headers = {'Authorization': f"Bearer {token['access']}"} # send in request response = requests.post(self.endpoint, files=files, headers=headers) # check response # check for object creation self.assertNotEqual(company_count, self.company_model.objects.count()) self.assertNotEqual(user_count, self.user_model.objects.count()) def test_auth_user_cannot_load_csv(self): company_count = self.company_model.objects.count() user_count = self.user_model.objects.count() # read csv file csv_file = '../coop.csv' files = {'csv_file': open(csv_file,'r')} # Authenticate token = get_tokens_for_user(self.user) headers = {'Authorization': f"Bearer {token['access']}"} # send in request response = requests.post(self.endpoint, files=files, headers=headers) # check response # check for object creation self.assertEqual(company_count, self.company_model.objects.count()) self.assertEqual(user_count, self.user_model.objects.count()) def test_anon_user_cannot_load_csv(self): company_count = self.company_model.objects.count() user_count = self.user_model.objects.count() # read csv file csv_file = '../coop.csv' files = {'csv_file': open(csv_file,'r')} # Query endpoint response = self.client.post(self.endpoint, files=files) # check response # check for object creation self.assertEqual(company_count, self.company_model.objects.count()) self.assertEqual(user_count, self.user_model.objects.count())