#132 | #121: backoffice / dev-1.0-121 (#131)

cfr #121

Co-authored-by: Christophe Siraut <d@tobald.eu.org>
Co-authored-by: bkfox <thomas bkfox net>
Co-authored-by: Thomas Kairos <thomas@bkfox.net>
Reviewed-on: rc/aircox#131
Co-authored-by: Chris Tactic <ctactic@noreply.git.radiocampus.be>
Co-committed-by: Chris Tactic <ctactic@noreply.git.radiocampus.be>
This commit is contained in:
2024-04-28 22:02:09 +02:00
committed by Thomas Kairos
parent 1e17a1334a
commit 55123c386d
348 changed files with 124397 additions and 17879 deletions

View File

@@ -1,304 +1,195 @@
import logging
from datetime import date
import os
import re
from django.conf import settings as conf
from django.db import models
from django.db.models import Q
from django.utils import timezone as tz
from django.utils.translation import gettext_lazy as _
from taggit.managers import TaggableManager
from aircox import utils
from aircox.conf import settings
from .episode import Episode
from .program import Program
logger = logging.getLogger("aircox")
from .file import File, FileQuerySet
__all__ = ("Sound", "SoundQuerySet", "Track")
__all__ = ("Sound", "SoundQuerySet")
class SoundQuerySet(models.QuerySet):
def station(self, station=None, id=None):
id = station.pk if id is None else id
return self.filter(program__station__id=id)
def episode(self, episode=None, id=None):
id = episode.pk if id is None else id
return self.filter(episode__id=id)
def diffusion(self, diffusion=None, id=None):
id = diffusion.pk if id is None else id
return self.filter(episode__diffusion__id=id)
def available(self):
return self.exclude(type=Sound.TYPE_REMOVED)
def public(self):
"""Return sounds available as podcasts."""
return self.filter(is_public=True)
class SoundQuerySet(FileQuerySet):
def downloadable(self):
"""Return sounds available as podcasts."""
return self.filter(is_downloadable=True)
def archive(self):
def broadcast(self):
"""Return sounds that are archives."""
return self.filter(type=Sound.TYPE_ARCHIVE)
return self.filter(broadcast=True, is_removed=False)
def path(self, paths):
if isinstance(paths, str):
return self.filter(file=paths.replace(conf.MEDIA_ROOT + "/", ""))
return self.filter(file__in=(p.replace(conf.MEDIA_ROOT + "/", "") for p in paths))
def playlist(self, archive=True, order_by=True):
def playlist(self, order_by="file"):
"""Return files absolute paths as a flat list (exclude sound without
path).
If `order_by` is True, order by path.
"""
if archive:
self = self.archive()
path)."""
if order_by:
self = self.order_by("file")
self = self.order_by(order_by)
return [
os.path.join(conf.MEDIA_ROOT, file)
for file in self.filter(file__isnull=False).values_list("file", flat=True)
]
def search(self, query):
return self.filter(
Q(name__icontains=query)
| Q(file__icontains=query)
| Q(program__title__icontains=query)
| Q(episode__title__icontains=query)
)
# TODO:
# - provide a default name based on program and episode
class Sound(models.Model):
"""A Sound is the representation of a sound file that can be either an
excerpt or a complete archive of the related diffusion."""
TYPE_OTHER = 0x00
TYPE_ARCHIVE = 0x01
TYPE_EXCERPT = 0x02
TYPE_REMOVED = 0x03
TYPE_CHOICES = (
(TYPE_OTHER, _("other")),
(TYPE_ARCHIVE, _("archive")),
(TYPE_EXCERPT, _("excerpt")),
(TYPE_REMOVED, _("removed")),
)
name = models.CharField(_("name"), max_length=64)
program = models.ForeignKey(
Program,
models.CASCADE,
blank=True, # NOT NULL
verbose_name=_("program"),
help_text=_("program related to it"),
db_index=True,
)
episode = models.ForeignKey(
Episode,
models.SET_NULL,
blank=True,
null=True,
verbose_name=_("episode"),
db_index=True,
)
type = models.SmallIntegerField(_("type"), choices=TYPE_CHOICES)
position = models.PositiveSmallIntegerField(
_("order"),
default=0,
help_text=_("position in the playlist"),
)
def _upload_to(self, filename):
subdir = settings.SOUND_ARCHIVES_SUBDIR if self.type == self.TYPE_ARCHIVE else settings.SOUND_EXCERPTS_SUBDIR
return os.path.join(self.program.path, subdir, filename)
file = models.FileField(
_("file"),
upload_to=_upload_to,
max_length=256,
db_index=True,
unique=True,
)
class Sound(File):
duration = models.TimeField(
_("duration"),
blank=True,
null=True,
help_text=_("duration of the sound"),
)
mtime = models.DateTimeField(
_("modification time"),
blank=True,
null=True,
help_text=_("last modification date and time"),
)
is_good_quality = models.BooleanField(
_("good quality"),
help_text=_("sound meets quality requirements"),
blank=True,
null=True,
)
is_public = models.BooleanField(
_("public"),
help_text=_("whether it is publicly available as podcast"),
default=False,
)
is_downloadable = models.BooleanField(
_("downloadable"),
help_text=_("whether it can be publicly downloaded by visitors (sound must be " "public)"),
help_text=_("Sound can be downloaded by website visitors."),
default=False,
)
broadcast = models.BooleanField(
_("Broadcast"),
default=False,
help_text=_("The sound is broadcasted on air"),
)
objects = SoundQuerySet.as_manager()
class Meta:
verbose_name = _("Sound")
verbose_name_plural = _("Sounds")
verbose_name = _("Sound file")
verbose_name_plural = _("Sound files")
@property
def url(self):
return self.file and self.file.url
_path_re = re.compile(
"^(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})"
"(_(?P<hour>[0-9]{2})h(?P<minute>[0-9]{2}))?"
"(_(?P<n>[0-9]+))?"
"_?[ -]*(?P<name>.*)$"
)
def __str__(self):
return "/".join(self.file.path.split("/")[-3:])
@classmethod
def read_path(cls, path):
"""Parse path name returning dictionary of extracted info. It can
contain:
def save(self, check=True, *args, **kwargs):
if self.episode is not None and self.program is None:
self.program = self.episode.program
if check:
self.check_on_file()
if not self.is_public:
self.is_downloadable = False
self.__check_name()
super().save(*args, **kwargs)
# TODO: rename get_file_mtime(self)
def get_mtime(self):
"""Get the last modification date from file."""
mtime = os.stat(self.file.path).st_mtime
mtime = tz.datetime.fromtimestamp(mtime)
mtime = mtime.replace(microsecond=0)
return tz.make_aware(mtime, tz.get_current_timezone())
def file_exists(self):
"""Return true if the file still exists."""
return os.path.exists(self.file.path)
# TODO: rename to sync_fs()
def check_on_file(self):
"""Check sound file info again'st self, and update informations if
needed (do not save).
Return True if there was changes.
- `year`, `month`, `day`: diffusion date
- `hour`, `minute`: diffusion time
- `n`: sound arbitrary number (used for sound ordering)
- `name`: cleaned name extracted or file name (without extension)
"""
if not self.file_exists():
if self.type == self.TYPE_REMOVED:
return
logger.debug("sound %s: has been removed", self.file.name)
self.type = self.TYPE_REMOVED
return True
basename = os.path.basename(path)
basename = os.path.splitext(basename)[0]
reg_match = cls._path_re.search(basename)
if reg_match:
info = reg_match.groupdict()
for k in ("year", "month", "day", "hour", "minute", "n"):
if info.get(k) is not None:
info[k] = int(info[k])
# not anymore removed
changed = False
name = info.get("name")
info["name"] = name and cls._as_name(name) or basename
else:
info = {"name": basename}
return info
if self.type == self.TYPE_REMOVED and self.program:
changed = True
self.type = (
self.TYPE_ARCHIVE if self.file.name.startswith(self.program.archives_path) else self.TYPE_EXCERPT
@classmethod
def _as_name(cls, name):
name = name.replace("_", " ")
return " ".join(r.capitalize() for r in name.split(" "))
def find_episode(self, path_info=None):
"""Base on self's file name, match date to an initial diffusion and
return corresponding episode or ``None``."""
pi = path_info or self.read_path(self.file.path)
if "year" not in pi:
return None
year, month, day = pi.get("year"), pi.get("month"), pi.get("day")
if pi.get("hour") is not None:
at = tz.datetime(year, month, day, pi.get("hour", 0), pi.get("minute", 0))
at = tz.make_aware(at)
else:
at = date(year, month, day)
diffusion = self.program.diffusion_set.at(at).first()
return diffusion and diffusion.episode or None
def find_playlist(self, meta=None):
"""Find a playlist file corresponding to the sound path, such as:
my_sound.ogg => my_sound.csv.
Use provided sound's metadata if any and no csv file has been
found.
"""
from aircox.controllers.playlist_import import PlaylistImport
from .track import Track
if self.track_set.count() > 1:
return
# import playlist
path_noext, ext = os.path.splitext(self.file.path)
path = path_noext + ".csv"
if os.path.exists(path):
PlaylistImport(path, sound=self).run()
# use metadata
elif meta and meta.tags:
title, artist, album, year = tuple(
t and ", ".join(t) for t in (meta.tags.get(k) for k in ("title", "artist", "album", "year"))
)
# check mtime -> reset quality if changed (assume file changed)
mtime = self.get_mtime()
if self.mtime != mtime:
self.mtime = mtime
self.is_good_quality = None
logger.debug(
"sound %s: m_time has changed. Reset quality info",
self.file.name,
title = title or path_noext
info = "{} ({})".format(album, year) if album and year else album or year or ""
track = Track(
sound=self,
position=int(meta.tags.get("tracknumber", 0)),
title=title,
artist=artist or _("unknown"),
info=info,
)
return True
track.save()
def get_upload_dir(self):
if self.broadcast:
return settings.SOUND_BROADCASTS_SUBDIR
return settings.SOUND_EXCERPTS_SUBDIR
meta = None
"""Provided by read_metadata: Mutagen's metadata."""
def sync_fs(self, *args, find_playlist=False, **kwargs):
changed = super().sync_fs(*args, **kwargs)
if changed and not self.is_removed:
if not self.program:
self.program = Program.get_from_path(self.file.path)
changed = True
if find_playlist and self.meta:
not self.pk and self.save(sync=False)
self.find_playlist(self.meta)
return changed
def __check_name(self):
if not self.name and self.file and self.file.name:
# FIXME: later, remove date?
name = os.path.basename(self.file.name)
name = os.path.splitext(name)[0]
self.name = name.replace("_", " ").strip()
def read_metadata(self):
import mutagen
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__check_name()
meta = mutagen.File(self.file.path)
metadata = {"duration": utils.seconds_to_time(meta.info.length), "meta": meta}
class Track(models.Model):
"""Track of a playlist of an object.
The position can either be expressed as the position in the playlist
or as the moment in seconds it started.
"""
episode = models.ForeignKey(
Episode,
models.CASCADE,
blank=True,
null=True,
verbose_name=_("episode"),
)
sound = models.ForeignKey(
Sound,
models.CASCADE,
blank=True,
null=True,
verbose_name=_("sound"),
)
position = models.PositiveSmallIntegerField(
_("order"),
default=0,
help_text=_("position in the playlist"),
)
timestamp = models.PositiveSmallIntegerField(
_("timestamp"),
blank=True,
null=True,
help_text=_("position (in seconds)"),
)
title = models.CharField(_("title"), max_length=128)
artist = models.CharField(_("artist"), max_length=128)
album = models.CharField(_("album"), max_length=128, null=True, blank=True)
tags = TaggableManager(verbose_name=_("tags"), blank=True)
year = models.IntegerField(_("year"), blank=True, null=True)
# FIXME: remove?
info = models.CharField(
_("information"),
max_length=128,
blank=True,
null=True,
help_text=_(
"additional informations about this track, such as " "the version, if is it a remix, features, etc."
),
)
class Meta:
verbose_name = _("Track")
verbose_name_plural = _("Tracks")
ordering = ("position",)
path_info = self.read_path(self.file.path)
if name := path_info.get("name"):
metadata["name"] = name
return metadata
def __str__(self):
return "{self.artist} -- {self.title} -- {self.position}".format(self=self)
def save(self, *args, **kwargs):
if (self.sound is None and self.episode is None) or (self.sound is not None and self.episode is not None):
raise ValueError("sound XOR episode is required")
super().save(*args, **kwargs)
infos = ""
if self.is_removed:
infos += _("removed")
if infos:
return f"{self.file.name} [{infos}]"
return f"{self.file.name}"