#137 Deployment: **Upgrade to Liquidsoap 2.4**: code has been adapted to work with liquidsoap 2.4 Co-authored-by: bkfox <thomas bkfox net> Reviewed-on: #138
188 lines
5.9 KiB
Python
188 lines
5.9 KiB
Python
from datetime import date
|
|
import os
|
|
import re
|
|
|
|
from django.conf import settings as conf
|
|
from django.db import models
|
|
from django.utils import timezone as tz
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from aircox import utils
|
|
from aircox.conf import settings
|
|
|
|
from .program import Program
|
|
from .file import File, FileQuerySet
|
|
|
|
|
|
__all__ = ("Sound", "SoundQuerySet")
|
|
|
|
|
|
class SoundQuerySet(FileQuerySet):
|
|
def downloadable(self):
|
|
"""Return sounds available as podcasts."""
|
|
return self.filter(is_downloadable=True)
|
|
|
|
def broadcast(self):
|
|
"""Return sounds that are archives."""
|
|
return self.filter(broadcast=True)
|
|
|
|
def playlist(self, order_by="file"):
|
|
"""Return files absolute paths as a flat list (exclude sound without
|
|
path)."""
|
|
if order_by:
|
|
self = self.order_by(order_by)
|
|
return [
|
|
os.path.join(conf.MEDIA_ROOT, file)
|
|
for file in self.filter(file__isnull=False).values_list("file", flat=True)
|
|
]
|
|
|
|
|
|
class Sound(File):
|
|
duration = models.TimeField(
|
|
_("duration"),
|
|
blank=True,
|
|
null=True,
|
|
help_text=_("duration of the sound"),
|
|
)
|
|
is_good_quality = models.BooleanField(
|
|
_("good quality"),
|
|
help_text=_("sound meets quality requirements"),
|
|
blank=True,
|
|
null=True,
|
|
)
|
|
is_downloadable = models.BooleanField(
|
|
_("downloadable"),
|
|
help_text=_("sound can be downloaded by visitors"),
|
|
default=False,
|
|
)
|
|
broadcast = models.BooleanField(
|
|
_("Broadcast"),
|
|
default=False,
|
|
help_text=_("The sound is broadcasted on air"),
|
|
)
|
|
|
|
objects = SoundQuerySet.as_manager()
|
|
|
|
class Meta:
|
|
verbose_name = _("Sound file")
|
|
verbose_name_plural = _("Sound files")
|
|
|
|
_path_re = re.compile(
|
|
"^(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})"
|
|
"(_(?P<hour>[0-9]{2})h(?P<minute>[0-9]{2}))?"
|
|
"(_(?P<n>[0-9]+))?"
|
|
"_?[ -]*(?P<name>.*)$"
|
|
)
|
|
|
|
@classmethod
|
|
def read_path(cls, path):
|
|
"""Parse path name returning dictionary of extracted info. It can
|
|
contain:
|
|
|
|
- `year`, `month`, `day`: diffusion date
|
|
- `hour`, `minute`: diffusion time
|
|
- `n`: sound arbitrary number (used for sound ordering)
|
|
- `name`: cleaned name extracted or file name (without extension)
|
|
"""
|
|
basename = os.path.basename(path)
|
|
basename = os.path.splitext(basename)[0]
|
|
reg_match = cls._path_re.search(basename)
|
|
if reg_match:
|
|
info = reg_match.groupdict()
|
|
for k in ("year", "month", "day", "hour", "minute", "n"):
|
|
if info.get(k) is not None:
|
|
info[k] = int(info[k])
|
|
|
|
name = info.get("name")
|
|
info["name"] = name and cls._as_name(name) or basename
|
|
else:
|
|
info = {"name": basename}
|
|
return info
|
|
|
|
@classmethod
|
|
def _as_name(cls, name):
|
|
name = name.replace("_", " ")
|
|
return " ".join(r.capitalize() for r in name.split(" "))
|
|
|
|
def find_episode(self, path_info=None):
|
|
"""Base on self's file name, match date to an initial diffusion and
|
|
return corresponding episode or ``None``."""
|
|
pi = path_info or self.read_path(self.file.path)
|
|
if "year" not in pi:
|
|
return None
|
|
|
|
year, month, day = pi.get("year"), pi.get("month"), pi.get("day")
|
|
if pi.get("hour") is not None:
|
|
at = tz.datetime(year, month, day, pi.get("hour", 0), pi.get("minute", 0))
|
|
at = tz.make_aware(at)
|
|
else:
|
|
at = date(year, month, day)
|
|
|
|
diffusion = self.program.diffusion_set.at(at).first()
|
|
return diffusion and diffusion.episode or None
|
|
|
|
def find_playlist(self, meta=None):
|
|
"""Find a playlist file corresponding to the sound path, such as:
|
|
my_sound.ogg => my_sound.csv.
|
|
|
|
Use provided sound's metadata if any and no csv file has been
|
|
found.
|
|
"""
|
|
from aircox.controllers.playlist_import import PlaylistImport
|
|
from .track import Track
|
|
|
|
if self.track_set.count() > 1:
|
|
return
|
|
|
|
# import playlist
|
|
path_noext, ext = os.path.splitext(self.file.path)
|
|
path = path_noext + ".csv"
|
|
if os.path.exists(path):
|
|
PlaylistImport(path, sound=self).run()
|
|
# use metadata
|
|
elif meta and meta.tags:
|
|
title, artist, album, year = tuple(
|
|
t and ", ".join(t) for t in (meta.tags.get(k) for k in ("title", "artist", "album", "year"))
|
|
)
|
|
title = title or path_noext
|
|
info = "{} ({})".format(album, year) if album and year else album or year or ""
|
|
track = Track(
|
|
sound=self,
|
|
position=int(meta.tags.get("tracknumber", 0)),
|
|
title=title,
|
|
artist=artist or _("unknown"),
|
|
info=info,
|
|
)
|
|
track.save()
|
|
|
|
def get_upload_dir(self):
|
|
if self.broadcast:
|
|
return settings.SOUND_BROADCASTS_SUBDIR
|
|
return settings.SOUND_EXCERPTS_SUBDIR
|
|
|
|
meta = None
|
|
"""Provided by read_metadata: Mutagen's metadata."""
|
|
|
|
def sync_fs(self, *args, find_playlist=False, **kwargs):
|
|
changed = super().sync_fs(*args, **kwargs)
|
|
if changed and not self.is_removed:
|
|
if not self.program:
|
|
self.program = Program.get_from_path(self.file.path)
|
|
changed = True
|
|
if find_playlist and self.meta:
|
|
not self.pk and self.save(sync=False)
|
|
self.find_playlist(self.meta)
|
|
return changed
|
|
|
|
def read_metadata(self):
|
|
import mutagen
|
|
|
|
meta = mutagen.File(self.file.path)
|
|
|
|
metadata = {"duration": utils.seconds_to_time(meta.info.length), "meta": meta}
|
|
|
|
path_info = self.read_path(self.file.path)
|
|
if name := path_info.get("name"):
|
|
metadata["name"] = name
|
|
return metadata
|