237 lines
7.4 KiB
Python
237 lines
7.4 KiB
Python
#! /usr/bin/env python3
|
|
"""Provide SoundFile which is used to link between database and file system.
|
|
|
|
File name
|
|
=========
|
|
It tries to parse the file name to get the date of the diffusion of an
|
|
episode and associate the file with it; We use the following format:
|
|
yyyymmdd[_n][_][name]
|
|
|
|
Where:
|
|
'yyyy' the year of the episode's diffusion;
|
|
'mm' the month of the episode's diffusion;
|
|
'dd' the day of the episode's diffusion;
|
|
'n' the number of the episode (if multiple episodes);
|
|
'name' the title of the sound;
|
|
|
|
Sound Quality
|
|
=============
|
|
To check quality of files, call the command sound_quality_check using the
|
|
parameters given by the setting AIRCOX_SOUND_QUALITY. This script requires
|
|
Sox (and soxi).
|
|
"""
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import date
|
|
|
|
import mutagen
|
|
from django.conf import settings as conf
|
|
from django.utils import timezone as tz
|
|
from django.utils.translation import gettext as _
|
|
|
|
from aircox import utils
|
|
from aircox.models import Program, Sound, Track
|
|
|
|
from .commands.import_playlist import PlaylistImport
|
|
|
|
logger = logging.getLogger("aircox.commands")
|
|
|
|
|
|
class SoundFile:
|
|
"""Handle synchronisation between sounds on files and database."""
|
|
|
|
path = None
|
|
info = None
|
|
path_info = None
|
|
sound = None
|
|
|
|
def __init__(self, path):
|
|
self.path = path
|
|
|
|
@property
|
|
def sound_path(self):
|
|
"""Relative path name."""
|
|
return self.path.replace(conf.MEDIA_ROOT + "/", "")
|
|
|
|
@property
|
|
def episode(self):
|
|
return self.sound and self.sound.episode
|
|
|
|
def sync(
|
|
self,
|
|
sound=None,
|
|
program=None,
|
|
deleted=False,
|
|
keep_deleted=False,
|
|
**kwargs
|
|
):
|
|
"""Update related sound model and save it."""
|
|
if deleted:
|
|
return self._on_delete(self.path, keep_deleted)
|
|
|
|
# FIXME: sound.program as not null
|
|
if not program:
|
|
program = Program.get_from_path(self.path)
|
|
logger.debug('program from path "%s" -> %s', self.path, program)
|
|
kwargs["program_id"] = program.pk
|
|
|
|
if sound:
|
|
created = False
|
|
else:
|
|
sound, created = Sound.objects.get_or_create(
|
|
file=self.sound_path, defaults=kwargs
|
|
)
|
|
|
|
self.sound = sound
|
|
self.path_info = self.read_path(self.path)
|
|
|
|
sound.program = program
|
|
if created or sound.check_on_file():
|
|
sound.name = self.path_info.get("name")
|
|
self.info = self.read_file_info()
|
|
if self.info is not None:
|
|
sound.duration = utils.seconds_to_time(self.info.info.length)
|
|
|
|
# check for episode
|
|
if sound.episode is None and "year" in self.path_info:
|
|
sound.episode = self.find_episode(sound, self.path_info)
|
|
sound.save()
|
|
|
|
# check for playlist
|
|
self.find_playlist(sound)
|
|
return sound
|
|
|
|
def _on_delete(self, path, keep_deleted):
|
|
# TODO: remove from db on delete
|
|
if keep_deleted:
|
|
sound = Sound.objects.path(self.path).first()
|
|
if sound:
|
|
if keep_deleted:
|
|
sound.type = sound.TYPE_REMOVED
|
|
sound.check_on_file()
|
|
sound.save()
|
|
return sound
|
|
else:
|
|
Sound.objects.path(self.path).delete()
|
|
|
|
def read_path(self, path):
|
|
"""Parse path name returning dictionary of extracted info. It can
|
|
contain:
|
|
|
|
- `year`, `month`, `day`: diffusion date
|
|
- `hour`, `minute`: diffusion time
|
|
- `n`: sound arbitrary number (used for sound ordering)
|
|
- `name`: cleaned name extracted or file name (without extension)
|
|
"""
|
|
basename = os.path.basename(path)
|
|
basename = os.path.splitext(basename)[0]
|
|
reg_match = self._path_re.search(basename)
|
|
if reg_match:
|
|
info = reg_match.groupdict()
|
|
for k in ("year", "month", "day", "hour", "minute", "n"):
|
|
if info.get(k) is not None:
|
|
info[k] = int(info[k])
|
|
|
|
name = info.get("name")
|
|
info["name"] = name and self._into_name(name) or basename
|
|
else:
|
|
info = {"name": basename}
|
|
return info
|
|
|
|
_path_re = re.compile(
|
|
"^(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})"
|
|
"(_(?P<hour>[0-9]{2})h(?P<minute>[0-9]{2}))?"
|
|
"(_(?P<n>[0-9]+))?"
|
|
"_?[ -]*(?P<name>.*)$"
|
|
)
|
|
|
|
def _into_name(self, name):
|
|
name = name.replace("_", " ")
|
|
return " ".join(r.capitalize() for r in name.split(" "))
|
|
|
|
def read_file_info(self):
|
|
"""Read file information and metadata."""
|
|
try:
|
|
if os.path.exists(self.path):
|
|
return mutagen.File(self.path)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def find_episode(self, sound, path_info):
|
|
"""For a given program, check if there is an initial diffusion to
|
|
associate to, using the date info we have. Update self.sound and save
|
|
it consequently.
|
|
|
|
We only allow initial diffusion since there should be no rerun.
|
|
"""
|
|
program, pi = sound.program, path_info
|
|
if "year" not in pi or not sound or sound.episode:
|
|
return None
|
|
|
|
year, month, day = pi.get("year"), pi.get("month"), pi.get("day")
|
|
if pi.get("hour") is not None:
|
|
at = tz.datetime(
|
|
year, month, day, pi.get("hour", 0), pi.get("minute", 0)
|
|
)
|
|
at = tz.get_current_timezone().localize(at)
|
|
else:
|
|
at = date(year, month, day)
|
|
|
|
diffusion = program.diffusion_set.at(at).first()
|
|
if not diffusion:
|
|
return None
|
|
|
|
logger.debug("%s <--> %s", sound.file.name, str(diffusion.episode))
|
|
return diffusion.episode
|
|
|
|
def find_playlist(self, sound=None, use_meta=True):
|
|
"""Find a playlist file corresponding to the sound path, such as:
|
|
my_sound.ogg => my_sound.csv.
|
|
|
|
Use sound's file metadata if no corresponding playlist has been
|
|
found and `use_meta` is True.
|
|
"""
|
|
if sound is None:
|
|
sound = self.sound
|
|
if sound.track_set.count() > 1:
|
|
return
|
|
|
|
# import playlist
|
|
path_noext, ext = os.path.splitext(self.sound.file.path)
|
|
path = path_noext + ".csv"
|
|
if os.path.exists(path):
|
|
PlaylistImport(path, sound=sound).run()
|
|
# use metadata
|
|
elif use_meta:
|
|
if self.info is None:
|
|
self.read_file_info()
|
|
if self.info and self.info.tags:
|
|
tags = self.info.tags
|
|
title, artist, album, year = tuple(
|
|
t and ", ".join(t)
|
|
for t in (
|
|
tags.get(k)
|
|
for k in ("title", "artist", "album", "year")
|
|
)
|
|
)
|
|
title = (
|
|
title
|
|
or (self.path_info and self.path_info.get("name"))
|
|
or os.path.basename(path_noext)
|
|
)
|
|
info = (
|
|
"{} ({})".format(album, year)
|
|
if album and year
|
|
else album or year or ""
|
|
)
|
|
track = Track(
|
|
sound=sound,
|
|
position=int(tags.get("tracknumber", 0)),
|
|
title=title,
|
|
artist=artist or _("unknown"),
|
|
info=info,
|
|
)
|
|
track.save()
|