first stab at admin_stats endpoint
This commit is contained in:
@@ -34,6 +34,7 @@ urlpatterns = [
|
||||
path('api/v1/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
|
||||
path('api/v1/token/verify/', TokenVerifyView.as_view(), name='token_verify'),
|
||||
path('api/v1/user/change_password/<int:pk>/', core_views.ChangeUserPasswordView.as_view(), name="change-password"),
|
||||
path('api/v1/admin_stats/', core_views.admin_stats, name='admin-stats'),
|
||||
path('api/v1/load_coops/', core_views.load_coop_managers, name='coop-loader'),
|
||||
path('api/v1/load_products/', product_views.load_coop_products, name='product-loader'),
|
||||
path('api/v1/search_products/', product_views.product_search, name='product-search'),
|
||||
|
||||
@@ -635,3 +635,86 @@ class CreateCompanyUserTest(APITestCase):
|
||||
self.assertEquals(response.status_code, 406)
|
||||
self.assertEquals(len(mail.outbox), 0)
|
||||
|
||||
|
||||
class AdminStatsTest(APITestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.endpoint = '/api/v1/admin_stats/'
|
||||
self.factory = factories.CustomUserFactory
|
||||
self.model = models.CustomUser
|
||||
# create user
|
||||
self.email = "user@mail.com"
|
||||
self.password = ''.join(random.choices(string.ascii_uppercase, k = 10))
|
||||
self.user = self.factory(email=self.email, is_active=True)
|
||||
self.user.set_password(self.password)
|
||||
self.user.save()
|
||||
|
||||
# anonymous user
|
||||
def test_anon_user_cannot_crud(self):
|
||||
"""Not logged-in user cannot access endpoint at all
|
||||
"""
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.get(self.endpoint)
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.post(self.endpoint, data={})
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.put(self.endpoint, data={})
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.delete(self.endpoint)
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
|
||||
|
||||
# authenticated user
|
||||
def test_auth_user_cannot_crud(self):
|
||||
"""Authenticated user cannot access endpoint at all
|
||||
"""
|
||||
# Authenticate user
|
||||
token = get_tokens_for_user(self.user)
|
||||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.get(self.endpoint)
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.post(self.endpoint, data={})
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.put(self.endpoint, data={})
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# Query endpoint
|
||||
response = self.client.delete(self.endpoint)
|
||||
# Assert access is forbidden
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
def test_admin_can_get_data(self):
|
||||
# make user admin
|
||||
self.user.is_staff = True
|
||||
self.user.save()
|
||||
|
||||
# Authenticate
|
||||
token = get_tokens_for_user(self.user)
|
||||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {token['access']}")
|
||||
|
||||
response = self.client.get(self.endpoint)
|
||||
self.assertEquals(response.status_code, 200)
|
||||
payload = response.json()
|
||||
expected_entries = ['company_count', 'product_count', 'companies_per_region', 'products_per_region']
|
||||
for name in expected_entries:
|
||||
self.assertTrue(name in payload)
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ from django.contrib.auth import get_user_model
|
||||
from django.utils.http import urlsafe_base64_decode
|
||||
from django.utils.encoding import force_text
|
||||
from django.db import IntegrityError
|
||||
from django.contrib.gis.geos import Point
|
||||
from django.contrib.gis.geos import Point, GEOSGeometry
|
||||
from django.shortcuts import redirect
|
||||
from django.conf import settings
|
||||
|
||||
@@ -23,7 +23,8 @@ from rest_framework.decorators import api_view, permission_classes
|
||||
|
||||
from companies.models import Company
|
||||
from companies.serializers import CompanySerializer
|
||||
from geo.models import City
|
||||
from products.models import Product
|
||||
from geo.models import City, Region
|
||||
|
||||
from . import models
|
||||
from . import serializers as core_serializers
|
||||
@@ -197,3 +198,26 @@ def activate_user(request, uidb64, token):
|
||||
return Response(f"Tu cuenta de usuario {user.email} ha sido activada")
|
||||
else:
|
||||
return Response({"error": f"Tu token de verificacion no coincide con ningún usuario registrado"}, status=status.HTTP_406_NOT_ACCEPTABLE)
|
||||
|
||||
|
||||
@api_view(['GET',])
|
||||
@permission_classes([IsAdminUser,])
|
||||
def admin_stats(request):
|
||||
company_count = Company.objects.count()
|
||||
product_count = Product.objects.count()
|
||||
companies_per_region = {}
|
||||
products_per_region = {}
|
||||
|
||||
for region in Region.objects.all():
|
||||
count = Company.objects.filter(geo__dwithin=region.geo).count()
|
||||
companies_per_region[region.name] = count
|
||||
count = Product.objects.filter(company__geo__dwithin=region.geo).count()
|
||||
products_per_region[region.name] = count
|
||||
|
||||
data = {
|
||||
'company_count': company_count,
|
||||
'product_count': product_count,
|
||||
'companies_per_region': companies_per_region,
|
||||
'products_per_region': products_per_region,
|
||||
}
|
||||
return Response(data=data)
|
||||
|
||||
Reference in New Issue
Block a user