import logging from io import BytesIO from django.contrib.auth import get_user_model from django.contrib.sites.shortcuts import get_current_site from django.utils.encoding import force_bytes from django.utils.http import urlsafe_base64_encode from django.template.loader import render_to_string from django.core.mail import EmailMessage from django.contrib.auth.tokens import PasswordResetTokenGenerator from django.conf import settings from django.core.validators import validate_email, EmailValidator, URLValidator, ValidationError from rest_framework_simplejwt.tokens import RefreshToken import requests from PIL import Image from django.core.files import File from tagulous.models import TagModel from companies.models import Company User = get_user_model() class AccountActivationTokenGenerator(PasswordResetTokenGenerator): def _make_hash_value(self, user, timestamp): return f"{user.pk}{timestamp}{user.full_name}" account_activation_token = AccountActivationTokenGenerator() def get_tokens_for_user(user): refresh = RefreshToken.for_user(user) return { 'refresh': str(refresh), 'access': str(refresh.access_token), } def get_auth_token(client, email, password): credentials = { 'email': email, 'password': password, } response = client.post('/api/v1/token/', credentials, format='json') if response.status_code == 200: return response.data['access'] else: # logging.error(f"User {email} was refused a token: {response.content}") return None def create_active_user(email, password): user = User.objects.create_user(email=email, password=password) user.is_active = True user.save() return user def create_admin_user(email, password): user = User.objects.create_user(email=email, password=password) user.is_staff = True user.is_active = True user.save() return user def send_verification_email(request, user): try: current_site = get_current_site(request) subject = 'Activa Tu Cuenta' message = render_to_string('email_verification.html', { 'user': user, 'domain': current_site.domain, 'uid': urlsafe_base64_encode(force_bytes(user.pk)), 'token': account_activation_token.make_token(user), }) email = EmailMessage( subject, message, to=[user.email] ) email.send() logging.info(f"Verification email sent to {user.email}") except Exception as e: logging.error(f"Could not sent verification email to: {user.email}") def reformat_google_taxonomy(file_name): """ Read from flat text file Create Herarchical Tag for each line """ base = settings.BASE_DIR + '/../datasets/' counter = 0 source_file_path = base + file_name destination_file_path = base + 'ALT' + file_name source_file = open(source_file_path, 'rt') destination_file = open(destination_file_path, 'wt') for line in source_file.readlines(): line = line.replace(' > ', '/') destination_file.write(line) def coop_loader(csv_reader, request=None): """ Parse csv data and extract: - coop data - manager user data Return counts """ coop_counter = 0 user_counter = 0 for row in csv_reader: # trim strings for key in row: if row[key]: row[key] = row[key].strip().lower() # import ipdb; ipdb.set_trace() if '' in (row['cif'], row['nombre-coop'], row['email']): logging.error(f"Required data missing: {row}") continue # validate email try: validate_email(row['email']) except ValidationError: logging.warning(f"Invalid email value '{row['email']}', skipped") continue # validate URLs if row['url'].startswith('http') is not True: row['url'] = 'http://' + row['url'] if row['logo-url'].startswith('http') is not True: row['logo-url'] = 'http://' + row['logo-url'] validator = URLValidator() try: validator(row['url']) except ValidationError: logging.warning(f"Invalid url value '{row['url']}'") row['url'] = None try: validator(row['logo-url']) except ValidationError: logging.warning(f"Invalid logo URL value '{row['logo-url']}'") row['logo-url'] = None # validate boolean try: shop = bool(row['es-tienda']) except: logging.warning(f"Invalid valur for es-tiends: {row['es-tienda']}") shop = None # create instances try: coop_data = { 'cif': row['cif'], 'company_name': row['nombre-coop'], 'short_name': row['nombre-corto'], 'shop': shop, 'shop_link': row['url'], 'phone': row['telefono'], 'address': row['direccion'], } coop = Company.objects.create(**coop_data) # image logo data if row['logo-url'] is not None: try: # get image headers={"User-Agent" : "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36"} response = requests.get(row['logo-url'], stream=True, headers=headers) assert(response.status_code==200) response.raw.decode_content = True image = Image.open(response.raw) # save using File object img_io = BytesIO() image.save(img_io, format=image.format) coop.logo.save(f"{coop.company_name}.{image.format.lower()}", File(img_io), save=False) coop.save() except AssertionError as e: logging.error(f"Source image [{row['logo-url']}] not reachable: {response.status_code}") except Exception as e: logging.error(f"Could not add image to COOP {coop.company_name} from [{row['logo-url']}]: {str(e)}") # logging.info(f"Created Coop: {coop_data}") coop_counter += 1 coop_user = User.objects.create_user(email=row['email'], company=coop, role='COOP_MANAGER', is_active=False) # send confirmation email if request is not None: send_verification_email(request, coop_user) logging.info(f"Created User: {coop_user}") user_counter += 1 except Exception as e: import ipdb; ipdb.set_trace() logging.error(f"Could not parse {row}") return coop_counter, user_counter