797 lines
27 KiB
Python
797 lines
27 KiB
Python
import random
|
|
import string
|
|
import json
|
|
import hashlib
|
|
import base64
|
|
import csv
|
|
|
|
from django.test import TestCase
|
|
from django.core import mail
|
|
from django.utils.http import urlsafe_base64_encode
|
|
from django.utils.encoding import force_bytes
|
|
from django.conf import settings
|
|
|
|
from rest_framework.test import APITestCase
|
|
from rest_framework import status
|
|
|
|
from core.utils import get_tokens_for_user, account_activation_token
|
|
|
|
from companies.models import Company
|
|
|
|
from . import models
|
|
from . import factories
|
|
# Create your tests here.
|
|
|
|
class CustomUserViewSetTest(APITestCase):
|
|
"""CustomUser viewset tests
|
|
"""
|
|
|
|
def setUp(self):
|
|
"""Tests setup
|
|
"""
|
|
self.endpoint = '/api/v1/users/'
|
|
self.factory = factories.CustomUserFactory
|
|
self.model = models.CustomUser
|
|
# create admin user
|
|
self.admin_email = f"admin_user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.admin_email, password=self.password, is_active=True)
|
|
# create regular user
|
|
self.reg_email = f"regular_user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.reg_email, password=self.password, is_active=True)
|
|
|
|
# anon user
|
|
def test_anon_user_can_create_inactive_instance(self):
|
|
"""Not logged-in user can create new instance of User but it's inactive
|
|
"""
|
|
data = {
|
|
'email': 'test@email.com',
|
|
'full_name': 'TEST NAME',
|
|
'password': 'VENTILADORES1234499.89',
|
|
}
|
|
|
|
# Query endpoint
|
|
response = self.client.post(self.endpoint, data=data)
|
|
|
|
# Assert creation is successful
|
|
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
|
# assert instance is inactive
|
|
info = json.loads(response.content)
|
|
self.assertFalse(info['is_active'])
|
|
# Assert instance exists on db
|
|
self.assertTrue(self.model.objects.get(email=info['email']))
|
|
# assert verification email
|
|
self.assertTrue(len(mail.outbox) == 1)
|
|
|
|
def test_anon_user_cannot_modify_existing_instance(self):
|
|
"""Not logged-in user cannot modify existing instance
|
|
"""
|
|
# Create instance
|
|
instance = self.factory()
|
|
|
|
# Query endpoint
|
|
url = self.endpoint + f'{instance.pk}/'
|
|
response = self.client.put(url, {}, format='json')
|
|
|
|
# Assert forbidden code
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_anon_user_cannot_delete_existing_instance(self):
|
|
"""Not logged-in user cannot delete existing instance
|
|
"""
|
|
# Create instances
|
|
instance = self.factory()
|
|
|
|
# Query endpoint
|
|
url = self.endpoint + f'{instance.pk}/'
|
|
response = self.client.delete(url)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
# Assert instance still exists on db
|
|
self.assertTrue(self.model.objects.get(id=instance.pk))
|
|
|
|
def test_anon_user_cannot_list_instances(self):
|
|
"""Not logged-in user can't read instance
|
|
"""
|
|
# Request list
|
|
response = self.client.get(self.endpoint)
|
|
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
# auth regular user
|
|
def test_regular_user_cannot_create_instance(self):
|
|
"""Regular user cannot create new instance
|
|
"""
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
response = self.client.post(self.endpoint, data={})
|
|
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
def test_regular_user_cannot_modify_existing_instance(self):
|
|
"""Regular user cannot modify existing instance
|
|
"""
|
|
# Create instance
|
|
instance = self.factory()
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = self.endpoint + f'{instance.pk}/'
|
|
response = self.client.put(url, {}, format='json')
|
|
|
|
# Assert forbidden code
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
def test_regular_user_cannot_delete_existing_instance(self):
|
|
"""Regular user cannot delete existing instance
|
|
"""
|
|
# Create instances
|
|
instance = self.factory()
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = self.endpoint + f'{instance.pk}/'
|
|
response = self.client.delete(url)
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
# Assert instance still exists on db
|
|
self.assertTrue(self.model.objects.get(id=instance.pk))
|
|
|
|
def test_regular_user_cannot_list_instances(self):
|
|
"""Regular user can't list instances
|
|
"""
|
|
# Request list
|
|
response = self.client.get(self.endpoint)
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_auth_user_can_modify_own_instance(self):
|
|
"""Regular user can modify own instance
|
|
"""
|
|
# Create instance
|
|
data = {
|
|
"email": "new_email@mail.com",
|
|
"full_name": "New Full Name",
|
|
'notify': True,
|
|
}
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = f'{self.endpoint}{self.user.pk}/'
|
|
response = self.client.put(url, data=data, format='json')
|
|
# Assert forbidden code
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
|
|
# Assert instance has been modified
|
|
for key in data:
|
|
self.assertEqual(data[key], response.data[key])
|
|
|
|
# admin user
|
|
def test_admin_user_can_create_instance(self):
|
|
"""Admin user can create new instance
|
|
"""
|
|
# set user as admin
|
|
self.user.is_staff = True
|
|
self.user.save()
|
|
|
|
# Define request data
|
|
data = {
|
|
'email': 'test@email.com',
|
|
'full_name': 'TEST NAME',
|
|
'password': 'VENTILADORES1234499.89',
|
|
}
|
|
|
|
# Authenticate user
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
response = self.client.post(self.endpoint, data=data, format='json')
|
|
|
|
# Assert endpoint returns created status
|
|
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
|
|
|
# Assert instance exists on db
|
|
self.assertTrue(self.model.objects.get(email=response.data['email']))
|
|
# assert verification email
|
|
self.assertTrue(len(mail.outbox) == 1)
|
|
|
|
def test_admin_user_can_modify_existing_instance(self):
|
|
"""Admin user can modify existing instance
|
|
"""
|
|
# set user as admin
|
|
self.user.is_staff = True
|
|
self.user.save()
|
|
|
|
# Create instances
|
|
instance = self.factory()
|
|
|
|
# Define request data
|
|
data = {
|
|
'email': 'test_alt@email.com',
|
|
'full_name': 'TEST NAME ALT',
|
|
'notify': True,
|
|
}
|
|
|
|
# Authenticate user
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = self.endpoint + f'{instance.pk}/'
|
|
response = self.client.put(url, data, format='json')
|
|
|
|
# Assert endpoint returns OK code
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
|
|
# Assert instance has been modified
|
|
for key in data:
|
|
self.assertEqual(data[key], response.data[key])
|
|
|
|
def test_admin_user_can_delete_existing_instance(self):
|
|
"""Admin user can delete existing instance
|
|
"""
|
|
# set user as admin
|
|
self.user.is_staff = True
|
|
self.user.save()
|
|
|
|
# Create instances
|
|
instance = self.factory()
|
|
|
|
# Authenticate user
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = self.endpoint + f'{instance.pk}/'
|
|
response = self.client.delete(url)
|
|
|
|
# assert 204 no content
|
|
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
|
# Assert instance doesn't exists anymore on db
|
|
self.assertFalse(self.model.objects.filter(id=instance.pk).exists())
|
|
|
|
|
|
class ChangeUserPasswordViewTest(APITestCase):
|
|
|
|
def setUp(self):
|
|
"""Tests setup
|
|
"""
|
|
self.endpoint = '/api/v1/user/update/'
|
|
self.factory = factories.CustomUserFactory
|
|
self.model = models.CustomUser
|
|
# create regular user
|
|
self.reg_email = f"user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.reg_email, is_active=True)
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
|
|
def test_fail_different_passwords(self):
|
|
# Create instance
|
|
data = {
|
|
"old_password": self.password,
|
|
"password": "!!!",
|
|
"password2": "SUPERSECRETNEWPASSWORD",
|
|
}
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = f'/api/v1/user/change_password/{self.user.pk}/'
|
|
response = self.client.put(url, data=data, format='json')
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
|
|
def test_fail_wrong_old_password(self):
|
|
# Create instance
|
|
data = {
|
|
"old_password": 'WRONG!!!!',
|
|
"password": 'SUPERSECRETNEWPASSWORD',
|
|
"password2": "SUPERSECRETNEWPASSWORD",
|
|
}
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = f'/api/v1/user/change_password/{self.user.pk}/'
|
|
response = self.client.put(url, data=data, format='json')
|
|
|
|
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
def test_auth_user_can_modify_own_password(self):
|
|
"""authenticated user can modify own password
|
|
"""
|
|
data = {
|
|
"old_password": self.password,
|
|
"password": "SUPERSECRETNEWPASSWORD",
|
|
"password2": "SUPERSECRETNEWPASSWORD",
|
|
}
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
url = f'/api/v1/user/change_password/{self.user.pk}/'
|
|
response = self.client.put(url, data=data, format='json')
|
|
# Assert forbidden code
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
|
|
# assert new password hash properly updated
|
|
# assert fields exist, and data matches
|
|
updated_user = self.model.objects.get(pk=self.user.id)
|
|
stored_value = updated_user.__dict__['password']
|
|
hash_type, iteration, salt, stored_password_hash = stored_value.split('$')
|
|
new_password_hash = hashlib.pbkdf2_hmac(
|
|
hash_name='sha256',
|
|
password=data['password'].encode(),
|
|
salt=salt.encode(),
|
|
iterations=int(iteration),
|
|
)
|
|
self.assertEqual(stored_password_hash, base64.b64encode(new_password_hash).decode())
|
|
|
|
|
|
class LoadCoopManagerTestCase(APITestCase):
|
|
|
|
def setUp(self):
|
|
"""Tests setup
|
|
"""
|
|
self.endpoint = '/api/v1/load_coops/'
|
|
self.user_factory = factories.CustomUserFactory
|
|
self.user_model = models.CustomUser
|
|
self.company_model = Company
|
|
# create admin user
|
|
self.admin_email = f"admin_user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.admin_user = self.user_factory(email=self.admin_email, password=self.password, is_staff=True, is_active=True)
|
|
# create regular user
|
|
self.reg_email = f"user@mail.com"
|
|
self.user = self.user_factory(email=self.reg_email, is_active=True)
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
# test CSV file path
|
|
self.csv_path = 'datasets/test_coop.csv'
|
|
|
|
def test_admin_can_load_csv(self):
|
|
# count existing instances
|
|
company_count = self.company_model.objects.count()
|
|
user_count = self.user_model.objects.count()
|
|
|
|
# read csv file
|
|
files = {'csv_file': open(self.csv_path,'rt')}
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.admin_user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# send in request
|
|
response = self.client.post(self.endpoint, files)
|
|
# check response
|
|
self.assertEqual(response.status_code, 200)
|
|
# check for object creation
|
|
self.assertEquals(company_count + 5, self.company_model.objects.count())
|
|
self.assertEquals(user_count + 5, self.user_model.objects.count())
|
|
self.assertEquals(5, len(mail.outbox))
|
|
|
|
def test_auth_user_cannot_load_csv(self):
|
|
company_count = self.company_model.objects.count()
|
|
user_count = self.user_model.objects.count()
|
|
|
|
# read csv file
|
|
files = {'csv_file': open(self.csv_path,'r')}
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# send in request
|
|
response = self.client.post(self.endpoint, files)
|
|
|
|
# check response
|
|
self.assertEqual(response.status_code, 403)
|
|
# check for object creation
|
|
self.assertEqual(company_count, self.company_model.objects.count())
|
|
self.assertEqual(user_count, self.user_model.objects.count())
|
|
|
|
def test_anon_user_cannot_load_csv(self):
|
|
company_count = self.company_model.objects.count()
|
|
user_count = self.user_model.objects.count()
|
|
|
|
# read csv file
|
|
files = {'csv_file': open(self.csv_path,'r')}
|
|
|
|
# send in request
|
|
response = self.client.post(self.endpoint, files)
|
|
|
|
# check response
|
|
self.assertEqual(response.status_code, 401)
|
|
# check for object creation
|
|
self.assertEqual(company_count, self.company_model.objects.count())
|
|
self.assertEqual(user_count, self.user_model.objects.count())
|
|
|
|
|
|
class MyUserViewTest(APITestCase):
|
|
"""my_user tests
|
|
"""
|
|
|
|
def setUp(self):
|
|
"""Tests setup
|
|
"""
|
|
self.endpoint = '/api/v1/my_user/'
|
|
self.factory = factories.CustomUserFactory
|
|
self.model = models.CustomUser
|
|
# create user
|
|
self.email = f"user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.email, is_active=True)
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
|
|
def test_auth_user_gets_data(self):
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
response = self.client.get(self.endpoint)
|
|
payload = response.json()
|
|
|
|
# Assert forbidden code
|
|
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
|
self.assertEquals(self.email, payload['email'])
|
|
|
|
def test_anon_user_cannot_access(self):
|
|
# send in request
|
|
response = self.client.get(self.endpoint)
|
|
|
|
# check response
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
|
|
class ActivateUserTest(APITestCase):
|
|
|
|
def setUp(self):
|
|
self.endpoint = 'activate/<uidb64>/<token>/'
|
|
self.factory = factories.CustomUserFactory
|
|
self.model = models.CustomUser
|
|
# create user
|
|
self.email = f"user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.email, is_active=False)
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
|
|
def test_correct_activation(self):
|
|
# create values
|
|
uid = urlsafe_base64_encode(force_bytes(self.user.pk))
|
|
token = account_activation_token.make_token(self.user)
|
|
|
|
url = f'/activate/{uid}/{token}/'
|
|
|
|
response = self.client.get(url)
|
|
|
|
# assertions
|
|
self.assertEquals(response.status_code, 302)
|
|
self.assertEquals(response.url, settings.ACTIVATION_REDIRECT)
|
|
|
|
def test_correct_activation_no_redirect(self):
|
|
# set ACTIVATION_REDIRECT to ''
|
|
settings.ACTIVATION_REDIRECT = ''
|
|
# create values
|
|
uid = urlsafe_base64_encode(force_bytes(self.user.pk))
|
|
token = account_activation_token.make_token(self.user)
|
|
|
|
url = f'/activate/{uid}/{token}/'
|
|
|
|
response = self.client.get(url)
|
|
|
|
# assertions
|
|
self.assertEquals(response.status_code, 200)
|
|
self.assertTrue(self.user.email in str(response.content))
|
|
|
|
def test_bad_activation(self):
|
|
# create values
|
|
uid = urlsafe_base64_encode(force_bytes(self.user.pk))[:-1]
|
|
token = account_activation_token.make_token(self.user)[:-1]
|
|
|
|
url = f'/activate/{uid}/{token}/'
|
|
|
|
response = self.client.get(url)
|
|
|
|
# assertions
|
|
self.assertEquals(response.status_code, 406)
|
|
self.assertTrue('error' in response.json())
|
|
|
|
|
|
class CreateCompanyUserTest(APITestCase):
|
|
|
|
def setUp(self):
|
|
self.endpoint = '/api/v1/create_company_user/'
|
|
self.factory = factories.CustomUserFactory
|
|
self.model = models.CustomUser
|
|
# create user
|
|
self.email = "user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.email, is_active=True)
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
|
|
def test_auth_user_can_create(self):
|
|
# instances data
|
|
data = {
|
|
'user': {
|
|
'email': 'test@email.com',
|
|
'full_name': 'TEST NAME',
|
|
'password': 'VENTILADORES1234499.89',
|
|
},
|
|
'company': {
|
|
'cif': 'qwerewq',
|
|
'company_name': 'qwerewq',
|
|
'short_name': 'qwerewq',
|
|
'web_link': 'http://qwerewq.com',
|
|
'shop': True,
|
|
'shop_link': 'http://qwerewq.com',
|
|
'platform': 'PRESTASHOP',
|
|
'email': 'test@email.com',
|
|
'logo': None,
|
|
'city': None,
|
|
'address': 'qwer qewr 5',
|
|
'geo': {'longitude': 1.0, 'latitude': 1.0},
|
|
'phone': '1234',
|
|
'mobile': '4321',
|
|
'other_phone': '41423',
|
|
'description': 'dfgfdgdfg',
|
|
'shop_rss_feed': 'http://qwerewq.com',
|
|
'sale_terms': 'tewrnmfew f ewfrfew ewewew f',
|
|
'shipping_cost': '12.25',
|
|
'sync': False
|
|
}
|
|
}
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
response = self.client.post(self.endpoint, data=data, format='json')
|
|
|
|
self.assertEquals(response.status_code, 201)
|
|
self.assertEquals(len(mail.outbox), 1)
|
|
# user exists and it's inactice
|
|
self.assertTrue(self.model.objects.get(email='test@email.com'))
|
|
self.assertFalse(self.model.objects.get(email='test@email.com').is_active)
|
|
self.assertTrue(Company.objects.get(cif='qwerewq'))
|
|
# assert verification email
|
|
self.assertTrue(len(mail.outbox) == 1)
|
|
|
|
def test_anon_user_can_create(self):
|
|
data = {
|
|
'user': {
|
|
'email': 'test@email.com',
|
|
'full_name': 'TEST NAME',
|
|
'password': 'VENTILADORES1234499.89',
|
|
},
|
|
'company': {
|
|
'cif': 'qwerewq',
|
|
'company_name': 'qwerewq',
|
|
'short_name': 'qwerewq',
|
|
'web_link': 'http://qwerewq.com',
|
|
'shop': True,
|
|
'shop_link': 'http://qwerewq.com',
|
|
'platform': 'PRESTASHOP',
|
|
'email': 'test@email.com',
|
|
'logo': None,
|
|
'city': None,
|
|
'address': 'qwer qewr 5',
|
|
'geo': {'longitude': 1.0, 'latitude': 1.0},
|
|
'phone': '1234',
|
|
'mobile': '4321',
|
|
'other_phone': '41423',
|
|
'description': 'dfgfdgdfg',
|
|
'shop_rss_feed': 'http://qwerewq.com',
|
|
'sale_terms': 'tewrnmfew f ewfrfew ewewew f',
|
|
'shipping_cost': '12.25',
|
|
'sync': False
|
|
}
|
|
}
|
|
|
|
response = self.client.post(self.endpoint, data=data, format='json')
|
|
|
|
self.assertEquals(response.status_code, 201)
|
|
self.assertEquals(len(mail.outbox), 1)
|
|
# user exists and it's inactice
|
|
self.assertTrue(self.model.objects.get(email='test@email.com'))
|
|
self.assertFalse(self.model.objects.get(email='test@email.com').is_active)
|
|
self.assertTrue(Company.objects.get(cif='qwerewq'))
|
|
# assert verification email
|
|
self.assertTrue(len(mail.outbox) == 1)
|
|
|
|
def test_creation_error(self):
|
|
|
|
response = self.client.post(self.endpoint, data={}, format='json')
|
|
|
|
self.assertEquals(response.status_code, 406)
|
|
self.assertEquals(len(mail.outbox), 0)
|
|
|
|
|
|
class AdminStatsTest(APITestCase):
|
|
|
|
def setUp(self):
|
|
self.endpoint = '/api/v1/admin_stats/'
|
|
self.factory = factories.CustomUserFactory
|
|
self.model = models.CustomUser
|
|
# create user
|
|
self.email = "user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.email, is_active=True)
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
|
|
# anonymous user
|
|
def test_anon_user_cannot_crud(self):
|
|
"""Not logged-in user cannot access endpoint at all
|
|
"""
|
|
|
|
# Query endpoint
|
|
response = self.client.get(self.endpoint)
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
# Query endpoint
|
|
response = self.client.post(self.endpoint, data={})
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
# Query endpoint
|
|
response = self.client.put(self.endpoint, data={})
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
# Query endpoint
|
|
response = self.client.delete(self.endpoint)
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
# authenticated user
|
|
def test_auth_user_cannot_crud(self):
|
|
"""Authenticated user cannot access endpoint at all
|
|
"""
|
|
# Authenticate user
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
# Query endpoint
|
|
response = self.client.get(self.endpoint)
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
# Query endpoint
|
|
response = self.client.post(self.endpoint, data={})
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
# Query endpoint
|
|
response = self.client.put(self.endpoint, data={})
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
# Query endpoint
|
|
response = self.client.delete(self.endpoint)
|
|
# Assert access is forbidden
|
|
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
|
|
|
# admin user
|
|
def test_admin_can_get_data(self):
|
|
# make user admin
|
|
self.user.is_staff = True
|
|
self.user.save()
|
|
|
|
# Authenticate
|
|
token = get_tokens_for_user(self.user)
|
|
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
|
|
|
response = self.client.get(self.endpoint)
|
|
self.assertEquals(response.status_code, 200)
|
|
payload = response.json()
|
|
expected_entries = ['company_count', 'product_count', 'companies_per_region', 'products_per_region', 'companies_timeline', 'products_timeline', 'users_timeline', 'contact_timeline', 'shopping_timeline']
|
|
for name in expected_entries:
|
|
self.assertTrue(name in payload)
|
|
|
|
|
|
class SocialLoginTest(APITestCase):
|
|
|
|
def setUp(self):
|
|
self.endpoint = '/api/v1/admin_stats/'
|
|
self.factory = factories.CustomUserFactory
|
|
self.model = models.CustomUser
|
|
# create user
|
|
self.email = "user@mail.com"
|
|
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
|
self.user = self.factory(email=self.email, is_active=True)
|
|
self.user.set_password(self.password)
|
|
self.user.save()
|
|
# data
|
|
self.callback_uri = 'http://127.0.0.1:8000/'
|
|
|
|
def test_user_can_login_facebook(self):
|
|
# get auth page
|
|
url = f'https://facebook.com/auth? \
|
|
response_type=code& \
|
|
client_id={settings.FACEBOOK_ID}& \
|
|
redirect_uri={self.callback_uri}& \
|
|
scope=profile& \
|
|
scope=email'
|
|
|
|
response = self.client.get(url)
|
|
# make assertions
|
|
self.assertEquals(response.status_code, 200)
|
|
|
|
# authenticate to auth page if not logged in
|
|
url2 = response.url
|
|
creds = {
|
|
'email': 'sam@mail.com',
|
|
'password': 'supersecret'
|
|
}
|
|
response = self.client.post(url2, data=creds)
|
|
# assertions
|
|
self.assertEquals(response.status_code, 200)
|
|
# redirection url
|
|
redirect = response.url
|
|
auth_code = redirect.split('=')[-1]
|
|
|
|
# authenticate previous query is valid
|
|
url = f'https://facebook.com/token/?
|
|
grant_type=authorization_code&\
|
|
code={auth_code}&\
|
|
redirect_uri={self.callback_uri}&\
|
|
client_id={settings.FACEBOOK_ID}&\
|
|
client_secret=CLIENT_SECRET'
|
|
|
|
response = self.client.get(url)
|
|
# assertions
|
|
self.assertEquals(response.status_code, 200)
|
|
# redirection url
|
|
redirect = response.url
|
|
auth_code = redirect.split('=')[-1]
|
|
|
|
def test_user_can_login_google(self):
|
|
url = f'https://google.com/auth? \
|
|
response_type=code& \
|
|
client_id={settings.GOOGLE_CLIENT_ID}& \
|
|
redirect_uri=CALLBACK_URI& \
|
|
scope=profile& \
|
|
scope=email'
|
|
|
|
response = self.client.get(url)
|
|
|
|
# assertions
|
|
self.assertEquals(response.status_code, 200)
|
|
|
|
def test_bad_login(self):
|
|
pass
|