import logging import os import shutil from django.conf import settings as conf from django.contrib.auth.models import Group, Permission, User from django.db import transaction from django.db.models import signals, F from django.db.models.functions import Concat, Substr from django.dispatch import receiver from django.utils import timezone as tz from aircox import utils from aircox.conf import settings from .article import Article from .diffusion import Diffusion from .episode import Episode from .page import Page from .program import Program from .schedule import Schedule from .sound import Sound logger = logging.getLogger("aircox") # Add a default group to a user when it is created. It also assigns a list # of permissions to the group if it is created. # # - group name: settings.AIRCOX_DEFAULT_USER_GROUP # - group permissions: settings.AIRCOX_DEFAULT_USER_GROUP_PERMS # @receiver(signals.post_save, sender=User) def user_default_groups(sender, instance, created, *args, **kwargs): """Set users to different default groups.""" if not created or instance.is_superuser: return for group_name, permissions in settings.DEFAULT_USER_GROUPS.items(): if instance.groups.filter(name=group_name).count(): continue group, created = Group.objects.get_or_create(name=group_name) if created and permissions: for codename in permissions: permission = Permission.objects.filter(codename=codename).first() if permission: group.permissions.add(permission) group.save() instance.groups.add(group) # ---- page @receiver(signals.post_save, sender=Page) def page_post_save__child_page_defaults(sender, instance, created, *args, **kwargs): initial_cover = getattr(instance, "__initial_cover", None) if initial_cover is None and instance.cover is not None: Episode.objects.filter(parent=instance, cover__isnull=True).update(cover=instance.cover) Article.objects.filter(parent=instance, cover__isnull=True).update(cover=instance.cover) # ---- program @receiver(signals.post_save, sender=Program) def program_post_save__clean_later_episodes(sender, instance, created, *args, **kwargs): if not instance.active: Diffusion.objects.program(instance).after(tz.now()).delete() Episode.objects.parent(instance).filter(diffusion__isnull=True).delete() @receiver(signals.post_save, sender=Program) def program_post_save__mv_sounds(sender, instance, created, *args, **kwargs): path_ = getattr(instance, "__initial_path", None) if path_ in (None, instance.path): return abspath = path_ and os.path.join(conf.MEDIA_ROOT, path_) if os.path.exists(abspath) and not os.path.exists(instance.abspath): logger.info( f"program #{instance.pk}'s dir changed to {instance.title} - update it.", instance.id, instance.title ) shutil.move(abspath, instance.abspath) Sound.objects.filter(path__startswith=path_).update(file=Concat("file", Substr(F("file"), len(path_)))) # ---- schedule @receiver(signals.pre_save, sender=Schedule) def schedule_pre_save(sender, instance, *args, **kwargs): if getattr(instance, "pk") is not None and "raw" not in kwargs: instance._initial = Schedule.objects.get(pk=instance.pk) @receiver(signals.post_save, sender=Schedule) def schedule_post_save(sender, instance, created, *args, **kwargs): """Handles Schedule's time, duration and timezone changes and update corresponding diffusions accordingly.""" initial = getattr(instance, "_initial", None) if not initial or ( (instance.time, instance.duration, instance.timezone) == (initial.time, initial.duration, initial.timezone) ): return today = tz.datetime.today() delta = instance.normalize(today) - initial.normalize(today) duration = utils.to_timedelta(instance.duration) with transaction.atomic(): qs = Diffusion.objects.filter(schedule=instance).after(tz.now()) for diffusion in qs: diffusion.start = diffusion.start + delta diffusion.end = diffusion.start + duration diffusion.save() @receiver(signals.pre_delete, sender=Schedule) def schedule_pre_delete(sender, instance, *args, **kwargs): """Delete later corresponding diffusion to a changed schedule.""" Diffusion.objects.filter(schedule=instance).after(tz.now()).delete() Episode.objects.filter(diffusion__isnull=True, content__isnull=True, episodesound__isnull=True).delete() # ---- diffusion @receiver(signals.post_delete, sender=Diffusion) def diffusion_post_delete(sender, instance, *args, **kwargs): Episode.objects.filter(diffusion__isnull=True, content__isnull=True, episodesound__isnull=True).delete() # ---- files @receiver(signals.post_delete, sender=Sound) def delete_file(sender, instance, *args, **kwargs): """Deletes file on `post_delete`""" if not instance.file: return path = instance.file.path qs = sender.objects.filter(file=path) if not qs.exists() and os.path.exists(path): os.remove(path)