forked from rc/aircox
		
	- !88 pytest on existing tests - !89 reorganise settings (! see notes for deployment) Co-authored-by: bkfox <thomas bkfox net> Reviewed-on: rc/aircox#92
		
			
				
	
	
		
			167 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			167 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
"""Import one or more playlist for the given sound. Attach it to the provided
 | 
						|
sound.
 | 
						|
 | 
						|
Playlists are in CSV format, where columns are separated with a
 | 
						|
'{settings.IMPORT_PLAYLIST_CSV_DELIMITER}'. Text quote is
 | 
						|
{settings.IMPORT_PLAYLIST_CSV_TEXT_QUOTE}.
 | 
						|
The order of the elements is: {settings.IMPORT_PLAYLIST_CSV_COLS}
 | 
						|
 | 
						|
If 'minutes' or 'seconds' are given, position will be expressed as timed
 | 
						|
position, instead of position in playlist.
 | 
						|
"""
 | 
						|
import csv
 | 
						|
import logging
 | 
						|
import os
 | 
						|
from argparse import RawTextHelpFormatter
 | 
						|
 | 
						|
from django.core.management.base import BaseCommand
 | 
						|
 | 
						|
from aircox.conf import settings
 | 
						|
from aircox.models import Sound, Track
 | 
						|
 | 
						|
__doc__ = __doc__.format(settings=settings)
 | 
						|
 | 
						|
__all__ = ("PlaylistImport", "Command")
 | 
						|
 | 
						|
 | 
						|
logger = logging.getLogger("aircox.commands")
 | 
						|
 | 
						|
 | 
						|
class PlaylistImport:
 | 
						|
    path = None
 | 
						|
    data = None
 | 
						|
    tracks = None
 | 
						|
    track_kwargs = {}
 | 
						|
 | 
						|
    def __init__(self, path=None, **track_kwargs):
 | 
						|
        self.path = path
 | 
						|
        self.track_kwargs = track_kwargs
 | 
						|
 | 
						|
    def reset(self):
 | 
						|
        self.data = None
 | 
						|
        self.tracks = None
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        self.read()
 | 
						|
        if self.track_kwargs.get("sound") is not None:
 | 
						|
            self.make_playlist()
 | 
						|
 | 
						|
    def read(self):
 | 
						|
        if not os.path.exists(self.path):
 | 
						|
            return True
 | 
						|
        with open(self.path, "r") as file:
 | 
						|
            logger.info("start reading csv " + self.path)
 | 
						|
            self.data = list(
 | 
						|
                csv.DictReader(
 | 
						|
                    (
 | 
						|
                        row
 | 
						|
                        for row in file
 | 
						|
                        if not (
 | 
						|
                            row.startswith("#") or row.startswith("\ufeff#")
 | 
						|
                        )
 | 
						|
                        and row.strip()
 | 
						|
                    ),
 | 
						|
                    fieldnames=settings.IMPORT_PLAYLIST_CSV_COLS,
 | 
						|
                    delimiter=settings.IMPORT_PLAYLIST_CSV_DELIMITER,
 | 
						|
                    quotechar=settings.IMPORT_PLAYLIST_CSV_TEXT_QUOTE,
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
    def make_playlist(self):
 | 
						|
        """Make a playlist from the read data, and return it.
 | 
						|
 | 
						|
        If save is true, save it into the database
 | 
						|
        """
 | 
						|
        if self.track_kwargs.get("sound") is None:
 | 
						|
            logger.error(
 | 
						|
                "related track's sound is missing. Skip import of "
 | 
						|
                + self.path
 | 
						|
                + "."
 | 
						|
            )
 | 
						|
            return
 | 
						|
 | 
						|
        maps = settings.IMPORT_PLAYLIST_CSV_COLS
 | 
						|
        tracks = []
 | 
						|
 | 
						|
        logger.info("parse csv file " + self.path)
 | 
						|
        has_timestamp = ("minutes" or "seconds") in maps
 | 
						|
        for index, line in enumerate(self.data):
 | 
						|
            if ("title" or "artist") not in line:
 | 
						|
                return
 | 
						|
            try:
 | 
						|
                timestamp = (
 | 
						|
                    int(line.get("minutes") or 0) * 60
 | 
						|
                    + int(line.get("seconds") or 0)
 | 
						|
                    if has_timestamp
 | 
						|
                    else None
 | 
						|
                )
 | 
						|
 | 
						|
                track, created = Track.objects.get_or_create(
 | 
						|
                    title=line.get("title"),
 | 
						|
                    artist=line.get("artist"),
 | 
						|
                    position=index,
 | 
						|
                    **self.track_kwargs
 | 
						|
                )
 | 
						|
                track.timestamp = timestamp
 | 
						|
                track.info = line.get("info")
 | 
						|
                tags = line.get("tags")
 | 
						|
                if tags:
 | 
						|
                    track.tags.add(*tags.lower().split(","))
 | 
						|
            except Exception as err:
 | 
						|
                logger.warning(
 | 
						|
                    "an error occured for track {index}, it may not "
 | 
						|
                    "have been saved: {err}".format(index=index, err=err)
 | 
						|
                )
 | 
						|
                continue
 | 
						|
 | 
						|
            track.save()
 | 
						|
            tracks.append(track)
 | 
						|
        self.tracks = tracks
 | 
						|
        return tracks
 | 
						|
 | 
						|
 | 
						|
class Command(BaseCommand):
 | 
						|
    help = __doc__
 | 
						|
 | 
						|
    def add_arguments(self, parser):
 | 
						|
        parser.formatter_class = RawTextHelpFormatter
 | 
						|
        parser.add_argument(
 | 
						|
            "path",
 | 
						|
            metavar="PATH",
 | 
						|
            type=str,
 | 
						|
            help="path of the input playlist to read",
 | 
						|
        )
 | 
						|
        parser.add_argument(
 | 
						|
            "--sound",
 | 
						|
            "-s",
 | 
						|
            type=str,
 | 
						|
            help="generate a playlist for the sound of the given path. "
 | 
						|
            "If not given, try to match a sound with the same path.",
 | 
						|
        )
 | 
						|
 | 
						|
    def handle(self, path, *args, **options):
 | 
						|
        # FIXME: absolute/relative path of sounds vs given path
 | 
						|
        if options.get("sound"):
 | 
						|
            sound = Sound.objects.filter(
 | 
						|
                file__icontains=options.get("sound")
 | 
						|
            ).first()
 | 
						|
        else:
 | 
						|
            path_, ext = os.path.splitext(path)
 | 
						|
            sound = Sound.objects.filter(path__icontains=path_).first()
 | 
						|
 | 
						|
        if not sound:
 | 
						|
            logger.error(
 | 
						|
                "no sound found in the database for the path "
 | 
						|
                "{path}".format(path=path)
 | 
						|
            )
 | 
						|
            return
 | 
						|
 | 
						|
        # FIXME: auto get sound.episode if any
 | 
						|
        importer = PlaylistImport(path, sound=sound).run()
 | 
						|
        for track in importer.tracks:
 | 
						|
            logger.info(
 | 
						|
                "track #{pos} imported: {title}, by {artist}".format(
 | 
						|
                    pos=track.position, title=track.title, artist=track.artist
 | 
						|
                )
 | 
						|
            )
 |