This commit is contained in:
bkfox
2015-10-03 14:58:44 +02:00
parent f9d2d47ee6
commit 1a5dcc3eb9
45 changed files with 380 additions and 321 deletions

0
aircox_programs/__init__.py Executable file
View File

95
aircox_programs/admin.py Executable file
View File

@ -0,0 +1,95 @@
import copy
from django import forms
from django.contrib import admin
from django.db import models
from suit.admin import SortableTabularInline, SortableModelAdmin
from autocomplete_light.contrib.taggit_field import TaggitWidget, TaggitField
from aircox_programs.forms import *
from aircox_programs.models import *
#
# Inlines
#
# TODO: inherits from the corresponding admin view
class SoundInline (admin.TabularInline):
model = Sound
class ScheduleInline (admin.TabularInline):
model = Schedule
extra = 1
class DiffusionInline (admin.TabularInline):
model = Diffusion
fields = ('episode', 'type', 'date', 'stream')
readonly_fields = ('date', 'stream')
extra = 1
class TrackInline (SortableTabularInline):
fields = ['artist', 'name', 'tags', 'position']
form = TrackForm
model = Track
sortable = 'position'
extra = 10
class NameableAdmin (admin.ModelAdmin):
fields = [ 'name' ]
list_display = ['id', 'name']
list_filter = []
search_fields = ['name',]
@admin.register(Sound)
class SoundAdmin (NameableAdmin):
fields = None
fieldsets = [
(None, { 'fields': NameableAdmin.fields + ['path' ] } ),
(None, { 'fields': ['duration', 'date', 'fragment' ] } )
]
@admin.register(Stream)
class StreamAdmin (SortableModelAdmin):
list_display = ('id', 'name', 'type', 'priority')
sortable = "priority"
@admin.register(Program)
class ProgramAdmin (NameableAdmin):
fields = NameableAdmin.fields + ['stream']
inlines = [ ScheduleInline ]
@admin.register(Episode)
class EpisodeAdmin (NameableAdmin):
list_filter = ['program'] + NameableAdmin.list_filter
fields = NameableAdmin.fields + ['sounds']
inlines = (TrackInline, DiffusionInline)
@admin.register(Diffusion)
class DiffusionAdmin (admin.ModelAdmin):
list_display = ('id', 'type', 'date', 'episode', 'program', 'stream')
list_filter = ('type', 'date', 'program', 'stream')
list_editable = ('type', 'date')
def get_queryset(self, request):
qs = super(DiffusionAdmin, self).get_queryset(request)
if 'type__exact' in request.GET and \
str(Diffusion.Type['unconfirmed']) in request.GET['type__exact']:
return qs
return qs.exclude(type = Diffusion.Type['unconfirmed'])
admin.site.register(Track)
admin.site.register(Schedule)

View File

@ -0,0 +1,50 @@
import autocomplete_light.shortcuts as al
from aircox_programs.models import *
from taggit.models import Tag
al.register(Tag)
class OneFieldAutocomplete(al.AutocompleteModelBase):
choice_html_format = u'''
<span class="block" data-value="%s">%s</span>
'''
def choice_html (self, choice):
value = choice[self.search_fields[0]]
return self.choice_html_format % (self.choice_label(choice),
self.choice_label(value))
def choices_for_request(self):
#if not self.request.user.is_staff:
# self.choices = self.choices.filter(private=False)
filter_args = { self.search_fields[0] + '__icontains': self.request.GET['q'] }
self.choices = self.choices.filter(**filter_args)
self.choices = self.choices.values(self.search_fields[0]).distinct()
return self.choices
class TrackArtistAutocomplete(OneFieldAutocomplete):
search_fields = ['artist']
model = Track
al.register(TrackArtistAutocomplete)
class TrackNameAutocomplete(OneFieldAutocomplete):
search_fields = ['name']
model = Track
al.register(TrackNameAutocomplete)
#class DiffusionAutocomplete(OneFieldAutocomplete):
# search_fields = ['episode', 'program', 'start', 'stop']
# model = Diffusion
#
#al.register(DiffusionAutocomplete)

19
aircox_programs/forms.py Normal file
View File

@ -0,0 +1,19 @@
from django import forms
from django.contrib.admin import widgets
import autocomplete_light.shortcuts as al
from autocomplete_light.contrib.taggit_field import TaggitWidget
from aircox_programs.models import *
class TrackForm (forms.ModelForm):
class Meta:
model = Track
fields = ['artist', 'name', 'tags', 'position']
widgets = {
'artist': al.TextWidget('TrackArtistAutocomplete'),
'name': al.TextWidget('TrackNameAutocomplete'),
'tags': TaggitWidget('TagAutocomplete'),
}

View File

View File

@ -0,0 +1,108 @@
"""
Manage diffusions using schedules, to update, clean up or check diffusions.
A diffusion generated using this utility is considered has type "unconfirmed",
and is not considered as ready for diffusion; To do so, users must confirm the
diffusion case by changing it's type to "default".
Different actions are available:
- "update" is the process that is used to generated them using programs
schedules for the (given) month.
- "clean" will remove all diffusions that are still unconfirmed and have been
planified before the (given) month.
- "check" will remove all diffusions that are unconfirmed and have been planified
from the (given) month and later.
"""
from argparse import RawTextHelpFormatter
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone as tz
from aircox_programs.models import *
class Actions:
@staticmethod
def update (date):
items = []
for schedule in Schedule.objects.filter(program__active = True):
items += schedule.diffusions_of_month(date, exclude_saved = True)
print('> {} new diffusions for schedule #{} ({})'.format(
len(items), schedule.id, str(schedule)
))
print('total of {} diffusions will be created. To be used, they need '
'manual approval.'.format(len(items)))
print(Diffusion.objects.bulk_create(items))
@staticmethod
def clean (date):
qs = Diffusion.objects.filter(type = Diffusion.Type['unconfirmed'],
date__lt = date)
print('{} diffusions will be removed'.format(qs.count()))
qs.delete()
@staticmethod
def check (date):
qs = Diffusion.objects.filter(type = Diffusion.Type['unconfirmed'],
date__gt = date)
items = []
for diffusion in qs:
schedules = Schedule.objects.filter(program = diffusion.program)
for schedule in schedules:
if schedule.match(diffusion.date):
break
else:
print('> #{}: {}'.format(diffusion.date, str(diffusion)))
items.append(diffusion.id)
print('{} diffusions will be removed'.format(len(items)))
if len(items):
Diffusion.objects.filter(id__in = items).delete()
class Command (BaseCommand):
help= __doc__
def add_arguments (self, parser):
parser.formatter_class=RawTextHelpFormatter
now = tz.datetime.today()
group = parser.add_argument_group('action')
group.add_argument(
'--update', action='store_true',
help = 'generate (unconfirmed) diffusions for the given month. '
'These diffusions must be confirmed manually by changing '
'their type to "normal"')
group.add_argument(
'--clean', action='store_true',
help = 'remove unconfirmed diffusions older than the given month')
group.add_argument(
'--check', action='store_true',
help = 'check future unconfirmed diffusions from the given date '
'agains\'t schedules and remove it if that do not match any '
'schedule')
group = parser.add_argument_group(
'date')
group.add_argument('--year', type=int, default=now.year,
help='used by update, default is today\'s year')
group.add_argument('--month', type=int, default=now.month,
help='used by update, default is today\'s month')
def handle (self, *args, **options):
date = tz.datetime(year = options.get('year'),
month = options.get('month'),
day = 1)
date = tz.make_aware(date)
if options.get('update'):
Actions.update(date)
elif options.get('clean'):
Actions.clean(date)
elif options.get('check'):
Actions.check(date)
else:
raise CommandError('no action has been given')

View File

@ -0,0 +1,271 @@
import argparse
import json
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
import aircox_programs.models as models
class Model:
# dict: key is the argument name, value is the constructor
required = {}
optional = {}
model = None
def __init__ (self, model, required = {}, optional = {}, post = None):
self.model = model
self.required = required
self.optional = optional
self.post = post
def to_string (self):
return '\n'.join(
[ ' - required: {}'.format(', '.join(self.required))
, ' - optional: {}'.format(', '.join(self.optional))
, (self.post is AddTags and ' - tags available\n') or
'\n'
])
def check_or_raise (self, options):
for req in self.required:
if req not in options:
raise CommandError('required argument ' + req + ' is missing')
def get_kargs (self, options):
kargs = {}
for i in self.required:
if options.get(i):
fn = self.required[i]
kargs[i] = fn(options[i])
for i in self.optional:
if options.get(i):
fn = self.optional[i]
kargs[i] = fn(options[i])
return kargs
def get_by_id (self, options):
id_list = options.get('id')
items = self.model.objects.filter( id__in = id_list )
if len(items) is not len(id_list):
for key, id in enumerate(id_list):
if id in items:
del id_list[key]
raise CommandError(
'the following ids has not been found: {} (no change done)'
, ', '.join(id_list)
)
return items
def make (self, options):
self.check_or_raise(options)
kargs = self.get_kargs(options)
item = self.model(**kargs)
item.save()
if self.post:
self.post(item, options)
print('{} #{} created'.format(self.model.name()
, item.id))
def update (self, options):
items = self.get_by_id(options)
for key, item in enumerate(items):
kargs = self.get_kargs(options)
item.__dict__.update(options)
item.save()
print('{} #{} updated'.format(self.model.name()
, item.id))
del items[key]
def delete (self, options):
items = self.get_by_id(options)
items.delete()
print('{} #{} deleted'.format(self.model.name()
, ', '.join(options.get('id'))
))
def dump (self, options):
qs = self.model.objects.all()
fields = ['id'] + [ f.name for f in self.model._meta.fields
if f.name is not 'id']
items = []
for item in qs:
r = []
for f in fields:
v = getattr(item, f)
if hasattr(v, 'id'):
v = v.id
r.append(v)
items.append(r)
if options.get('head'):
items = items[0:options.get('head')]
elif options.get('tail'):
items = items[-options.get('tail'):]
if options.get('fields'):
print(json.dumps(fields))
print(json.dumps(items, default = lambda x: str(x)))
return
def DateTime (string):
dt = timezone.datetime.strptime(string, '%Y-%m-%d %H:%M:%S')
return timezone.make_aware(dt, timezone.get_current_timezone())
def Time (string):
dt = timezone.datetime.strptime(string, '%H:%M')
return timezone.datetime.time(dt)
def AddTags (instance, options):
if options.get('tags'):
instance.tags.add(*options['tags'])
models = {
'program': Model( models.Program
, { 'title': str }
, { 'subtitle': str, 'can_comment': bool, 'date': DateTime
, 'parent_id': int, 'public': bool
, 'url': str, 'email': str, 'non_stop': bool
}
, AddTags
)
, 'article': Model( models.Article
, { 'title': str }
, { 'subtitle': str, 'can_comment': bool, 'date': DateTime
, 'parent_id': int, 'public': bool
, 'static_page': bool, 'focus': bool
}
, AddTags
)
, 'episode': Model( models.Episode
, { 'title': str }
, { 'subtitle': str, 'can_comment': bool, 'date': DateTime
, 'parent_id': int, 'public': bool
}
, AddTags
)
, 'schedule': Model( models.Schedule
, { 'parent_id': int, 'date': DateTime, 'duration': Time
, 'frequency': int }
, { 'rerun': int } # FIXME: redo
)
, 'sound': Model( models.Sound
, { 'parent_id': int, 'date': DateTime, 'file': str
, 'duration': Time}
, { 'fragment': bool, 'embed': str, 'removed': bool }
, AddTags
)
}
class Command (BaseCommand):
help='Create, update, delete or dump an element of the given model.' \
' If no action is given, dump it'
def add_arguments (self, parser):
parser.add_argument( 'model', type=str
, metavar="MODEL"
, help='model to add. It must be in {}'\
.format(', '.join(models.keys()))
)
group = parser.add_argument_group('actions')
group.add_argument('--dump', action='store_true')
group.add_argument('--add', action='store_true'
, help='create or update (if id is given) object')
group.add_argument('--delete', action='store_true')
group.add_argument('--json', action='store_true'
, help='dump using json')
group = parser.add_argument_group('selector')
group.add_argument('--id', type=str, nargs='+'
, metavar="ID"
, help='select existing object by id'
)
group.add_argument('--head', type=int
, help='dump the HEAD first objects only'
)
group.add_argument('--tail', type=int
, help='dump the TAIL last objects only'
)
group.add_argument('--fields', action='store_true'
, help='print fields before dumping'
)
# publication/generic
group = parser.add_argument_group('fields'
, 'depends on the given model')
group.add_argument('--parent_id', type=str)
group.add_argument('--title', type=str)
group.add_argument('--subtitle', type=str)
group.add_argument('--can_comment',action='store_true')
group.add_argument('--public', action='store_true')
group.add_argument( '--date', type=str
, help='a valid date time (Y/m/d H:m:s')
group.add_argument('--tags', type=str, nargs='+')
# program
group.add_argument('--url', type=str)
group.add_argument('--email', type=str)
group.add_argument('--non_stop', type=int)
# article
group.add_argument('--static_page',action='store_true')
group.add_argument('--focus', action='store_true')
# schedule
group.add_argument('--duration', type=str)
group.add_argument('--frequency', type=int)
group.add_argument('--rerun', type=int)
# fields
parser.formatter_class=argparse.RawDescriptionHelpFormatter
parser.epilog = 'available fields per model:'
for name, model in models.items():
parser.epilog += '\n ' + model.model.type() + ': \n' \
+ model.to_string()
def handle (self, *args, **options):
model = options.get('model')
if not model:
raise CommandError('no model has been given')
model = model.lower()
if model not in models:
raise CommandError('model {} is not supported'.format(str(model)))
if options.get('add'):
if options.get('id'):
models[model].update(options)
else:
models[model].make(options)
elif options.get('delete'):
models[model].delete(options)
else: # --dump --json
models[model].dump(options)

View File

@ -0,0 +1,136 @@
"""
Check over programs' sound files, scan them, and add them to the
database if they are not there yet.
It tries to parse the file name to get the date of the diffusion of an
episode and associate the file with it; We use the following format:
yyyymmdd[_n][_][title]
Where:
'yyyy' is the year of the episode's diffusion;
'mm' is the month of the episode's diffusion;
'dd' is the day of the episode's diffusion;
'n' is the number of the episode (if multiple episodes);
'title' the title of the sound;
"""
import os
import re
from argparse import RawTextHelpFormatter
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from aircox_programs.models import *
import aircox_programs.settings as settings
class Command (BaseCommand):
help= __doc__
def report (self, program = None, component = None, *content):
if not component:
print('{}: '.format(program), *content)
else:
print('{}, {}: '.format(program, component), *content)
def add_arguments (self, parser):
parser.formatter_class=RawTextHelpFormatter
def handle (self, *args, **options):
programs = Program.objects.filter()
for program in programs:
self.check(program, program.path + '/public', public = True)
self.check(program, program.path + '/podcasts', embed = True)
self.check(program, program.path + '/private')
def get_sound_info (self, path):
"""
Parse file name to get info on the assumption it has the correct
format (given in Command.help)
"""
r = re.search('^(?P<year>[0-9]{4})'
'(?P<month>[0-9]{2})'
'(?P<day>[0-9]{2})'
'(_(?P<n>[0-9]+))?'
'_?(?P<name>.*)\.\w+$',
os.path.basename(path))
if not (r and r.groupdict()):
self.report(program, path, "file path is not correct, use defaults")
r = {
'name': os.path.splitext(path)
}
r['path'] = path
return r
def ensure_sound (self, sound_info):
"""
Return the Sound for the given sound_info; If not found, create it
without saving it.
"""
sound = Sound.objects.filter(path = path)
if sound:
sound = sound[0]
else:
sound = Sound(path = path, title = sound_info['name'])
def find_episode (self, program, sound_info):
"""
For a given program, and sound path check if there is an episode to
associate to, using the diffusion's date.
If there is no matching episode, return None.
"""
# check on episodes
diffusion = Diffusion.objects.filter(
program = program,
date__year = int(sound_info['year']),
date__month = int(sound_info['month']),
date__day = int(sound_info['day'])
)
if not diffusion.count():
self.report(program, path, 'no diffusion found for the given date')
return
diffusion = diffusion[0]
return diffusion.episode or None
def check (self, program, dir_path, public = False, embed = False):
"""
Scan a given directory that is associated to the given program, and
update sounds information
Return a list of scanned sounds
"""
if not os.path.exists(dir_path):
return
paths = []
for path in os.listdir(dir_path):
path = dir_path + '/' + path
if not path.endswith(settings.AIRCOX_SOUNDFILE_EXT):
continue
paths.append(path)
sound_info = self.get_sound_info(path)
sound = self.ensure_sound(sound_info)
sound.public = public
# episode and relation
if 'year' in sound_info:
episode = self.find_episode(program, sound_info)
if episode:
for sound_ in episode.sounds.get_queryset():
if sound_.path == sound.path:
break
else:
self.report(program, path, 'associate sound to episode ',
episode.id)
episode.sounds.add(sound)
return paths

View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
import taggit.managers
class Migration(migrations.Migration):
dependencies = [
('taggit', '0002_auto_20150616_2121'),
]
operations = [
migrations.CreateModel(
name='Diffusion',
fields=[
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
('type', models.SmallIntegerField(choices=[(0, 'default'), (4, 'stop'), (1, 'unconfirmed'), (2, 'cancel')], verbose_name='type')),
('date', models.DateTimeField(verbose_name='start of the diffusion')),
],
options={
'verbose_name': 'Diffusion',
'verbose_name_plural': 'Diffusions',
},
),
migrations.CreateModel(
name='Episode',
fields=[
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
('name', models.CharField(verbose_name='nom', max_length=128)),
],
options={
'verbose_name': 'Épisode',
'verbose_name_plural': 'Épisodes',
},
),
migrations.CreateModel(
name='Program',
fields=[
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
('name', models.CharField(verbose_name='nom', max_length=128)),
('active', models.BooleanField(default=True, help_text='if not set this program is no longer active', verbose_name='inactive')),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='Schedule',
fields=[
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
('date', models.DateTimeField(verbose_name='date')),
('duration', models.TimeField(verbose_name='durée')),
('frequency', models.SmallIntegerField(choices=[(4, 'third'), (32, 'one on two'), (10, 'second and fourth'), (5, 'first and third'), (31, 'every'), (2, 'second'), (1, 'first'), (16, 'last'), (8, 'fourth')], verbose_name='fréquence')),
('program', models.ForeignKey(to='aircox_programs.Program', null=True, blank=True)),
('rerun', models.ForeignKey(to='aircox_programs.Schedule', null=True, help_text='Schedule of a rerun of this one', blank=True, verbose_name='rediffusion')),
],
options={
'verbose_name': 'Schedule',
'verbose_name_plural': 'Schedules',
},
),
migrations.CreateModel(
name='Sound',
fields=[
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
('name', models.CharField(verbose_name='nom', max_length=128)),
('path', models.FilePathField(recursive=True, match='*(.ogg|.flac|.wav|.mp3|.opus)$', path='/media/data/courants/code/aircox/static/media/programs', null=True, blank=True, verbose_name='fichier')),
('embed', models.TextField(null=True, blank=True, help_text='if set, consider the sound podcastable', verbose_name='embed HTML code from external website')),
('duration', models.TimeField(null=True, blank=True, verbose_name='durée')),
('public', models.BooleanField(default=False, help_text='the element is public', verbose_name='public')),
('fragment', models.BooleanField(default=False, help_text='the file is a cut', verbose_name='son incomplet')),
('removed', models.BooleanField(default=False, help_text='this sound has been removed from filesystem')),
],
options={
'verbose_name': 'Sound',
'verbose_name_plural': 'Sounds',
},
),
migrations.CreateModel(
name='Stream',
fields=[
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
('name', models.CharField(null=True, blank=True, verbose_name='nom', max_length=32)),
('public', models.BooleanField(default=True, help_text='program list is public', verbose_name='public')),
('type', models.SmallIntegerField(choices=[(1, 'schedule'), (0, 'random')], verbose_name='type')),
('priority', models.SmallIntegerField(default=0, help_text='priority of the stream', verbose_name='priority')),
],
),
migrations.CreateModel(
name='Track',
fields=[
('id', models.AutoField(serialize=False, auto_created=True, verbose_name='ID', primary_key=True)),
('name', models.CharField(verbose_name='nom', max_length=128)),
('artist', models.CharField(verbose_name='artiste', max_length=128)),
('position', models.SmallIntegerField(default=0, help_text='position in the playlist')),
('episode', models.ForeignKey(to='aircox_programs.Episode')),
('tags', taggit.managers.TaggableManager(through='taggit.TaggedItem', to='taggit.Tag', blank=True, help_text='A comma-separated list of tags.', verbose_name='mots-clés')),
],
options={
'verbose_name': 'Track',
'verbose_name_plural': 'Tracks',
},
),
migrations.AddField(
model_name='program',
name='stream',
field=models.ForeignKey(to='aircox_programs.Stream', verbose_name='stream'),
),
migrations.AddField(
model_name='episode',
name='program',
field=models.ForeignKey(to='aircox_programs.Program', help_text='parent program', verbose_name='program'),
),
migrations.AddField(
model_name='episode',
name='sounds',
field=models.ManyToManyField(to='aircox_programs.Sound', blank=True, verbose_name='sounds'),
),
migrations.AddField(
model_name='diffusion',
name='episode',
field=models.ForeignKey(to='aircox_programs.Episode', null=True, blank=True, verbose_name='épisode'),
),
migrations.AddField(
model_name='diffusion',
name='program',
field=models.ForeignKey(to='aircox_programs.Program', verbose_name='program'),
),
migrations.AddField(
model_name='diffusion',
name='stream',
field=models.ForeignKey(default=0, help_text='stream id on which the diffusion happens', to='aircox_programs.Stream', verbose_name='stream'),
),
]

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('aircox_programs', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='diffusion',
name='type',
field=models.SmallIntegerField(choices=[(4, 'stop'), (2, 'cancel'), (1, 'unconfirmed'), (0, 'default')], verbose_name='type'),
),
migrations.AlterField(
model_name='schedule',
name='frequency',
field=models.SmallIntegerField(choices=[(31, 'every'), (8, 'fourth'), (5, 'first and third'), (10, 'second and fourth'), (16, 'last'), (32, 'one on two'), (4, 'third'), (1, 'first'), (2, 'second')], verbose_name='fréquence'),
),
]

View File

433
aircox_programs/models.py Executable file
View File

@ -0,0 +1,433 @@
import os
from django.db import models
from django.template.defaultfilters import slugify
from django.utils.translation import ugettext as _, ugettext_lazy
from django.utils import timezone as tz
from django.utils.html import strip_tags
from taggit.managers import TaggableManager
import aircox_programs.settings as settings
def date_or_default (date, date_only = False):
"""
Return date or default value (now) if not defined, and remove time info
if date_only is True
"""
date = date or tz.datetime.today()
if not tz.is_aware(date):
date = tz.make_aware(date)
if date_only:
return date.replace(hour = 0, minute = 0, second = 0, microsecond = 0)
return date
class Nameable (models.Model):
name = models.CharField (
_('name'),
max_length = 128,
)
def get_slug_name (self):
return slugify(self.name)
def __str__ (self):
if self.pk:
return '#{} {}'.format(self.pk, self.name)
return '{}'.format(self.name)
class Meta:
abstract = True
class Track (Nameable):
# There are no nice solution for M2M relations ship (even without
# through) in django-admin. So we unfortunately need to make one-
# to-one relations and add a position argument
episode = models.ForeignKey(
'Episode',
)
artist = models.CharField(
_('artist'),
max_length = 128,
)
# position can be used to specify a position in seconds for non-
# stop programs or a position in the playlist
position = models.SmallIntegerField(
default = 0,
help_text=_('position in the playlist'),
)
tags = TaggableManager(
_('tags'),
blank = True,
)
def __str__(self):
return ' '.join([self.artist, ':', self.name ])
class Meta:
verbose_name = _('Track')
verbose_name_plural = _('Tracks')
class Sound (Nameable):
"""
A Sound is the representation of a sound, that can be:
- An episode podcast/complete record
- An episode partial podcast
- An episode is a part of the episode but not usable for direct podcast
We can manage this using the "public" and "fragment" fields. If a Sound is
public, then we can podcast it. If a Sound is a fragment, then it is not
usable for diffusion.
Each sound can be associated to a filesystem's file or an embedded
code (for external podcasts).
"""
path = models.FilePathField(
_('file'),
path = settings.AIRCOX_PROGRAMS_DIR,
match = '*(' + '|'.join(settings.AIRCOX_SOUNDFILE_EXT) + ')$',
recursive = True,
blank = True, null = True,
)
embed = models.TextField(
_('embed HTML code from external website'),
blank = True, null = True,
help_text = _('if set, consider the sound podcastable'),
)
duration = models.TimeField(
_('duration'),
blank = True, null = True,
)
public = models.BooleanField(
_('public'),
default = False,
help_text = _("the element is public"),
)
fragment = models.BooleanField(
_('incomplete sound'),
default = False,
help_text = _("the file is a cut"),
)
removed = models.BooleanField(
default = False,
help_text = _('this sound has been removed from filesystem'),
)
def get_mtime (self):
"""
Get the last modification date from file
"""
mtime = os.stat(self.path).st_mtime
mtime = tz.datetime.fromtimestamp(mtime)
return tz.make_aware(mtime, timezone.get_current_timezone())
def save (self, *args, **kwargs):
if not self.pk:
self.date = self.get_mtime()
super().save(*args, **kwargs)
def __str__ (self):
return '/'.join(self.path.split('/')[-3:])
class Meta:
verbose_name = _('Sound')
verbose_name_plural = _('Sounds')
class Schedule (models.Model):
# Frequency for schedules. Basically, it is a mask of bits where each bit is
# a week. Bits > rank 5 are used for special schedules.
# Important: the first week is always the first week where the weekday of
# the schedule is present.
# For ponctual programs, there is no need for a schedule, only a diffusion
Frequency = {
'first': 0b000001,
'second': 0b000010,
'third': 0b000100,
'fourth': 0b001000,
'last': 0b010000,
'first and third': 0b000101,
'second and fourth': 0b001010,
'every': 0b011111,
'one on two': 0b100000,
}
for key, value in Frequency.items():
ugettext_lazy(key)
program = models.ForeignKey(
'Program',
blank = True, null = True,
)
date = models.DateTimeField(_('date'))
duration = models.TimeField(
_('duration'),
)
frequency = models.SmallIntegerField(
_('frequency'),
choices = [ (y, x) for x,y in Frequency.items() ],
)
rerun = models.ForeignKey(
'self',
verbose_name = _('rerun'),
blank = True, null = True,
help_text = "Schedule of a rerun of this one",
)
def match (self, date = None, check_time = True):
"""
Return True if the given datetime matches the schedule
"""
date = date_or_default(date)
if self.date.weekday() == date.weekday() and self.match_week(date):
return self.date.time() == date.time() if check_time else True
return False
def match_week (self, date = None):
"""
Return True if the given week number matches the schedule, False
otherwise.
If the schedule is ponctual, return None.
"""
date = date_or_default(date)
if self.frequency == Schedule.Frequency['one on two']:
week = date.isocalendar()[1]
return (week % 2) == (self.date.isocalendar()[1] % 2)
first_of_month = tz.datetime.date(date.year, date.month, 1)
week = date.isocalendar()[1] - first_of_month.isocalendar()[1]
# weeks of month
if week == 4:
# fifth week: return if for every week
return self.frequency == 0b1111
return (self.frequency & (0b0001 << week) > 0)
def normalize (self, date):
"""
Set the time of a datetime to the schedule's one
"""
return date.replace(hour = self.date.hour, minute = self.date.minute)
def dates_of_month (self, date = None):
"""
Return a list with all matching dates of date.month (=today)
"""
date = date_or_default(date, True).replace(day=1)
wday = self.date.weekday()
fwday = date.weekday()
# move date to the date weekday of the schedule
# check on SO#3284452 for the formula
date += tz.timedelta(days = (7 if fwday > wday else 0) - fwday + wday)
fwday = date.weekday()
# special frequency case
weeks = self.frequency
if self.frequency == Schedule.Frequency['last']:
date += tz.timedelta(month = 1, days = -7)
return self.normalize([date])
if weeks == Schedule.Frequency['one on two']:
# if both week are the same, then the date week of the month
# matches. Note: wday % 2 + fwday % 2 => (wday + fwday) % 2
fweek = date.isocalendar()[1]
week = self.date.isocalendar()[1]
weeks = 0b010101 if not (fweek + week) % 2 else 0b001010
dates = []
for week in range(0,5):
# there can be five weeks in a month
if not weeks & (0b1 << week):
continue
wdate = date + tz.timedelta(days = week * 7)
if wdate.month == date.month:
dates.append(self.normalize(wdate))
return dates
def diffusions_of_month (self, date, exclude_saved = False):
"""
Return a list of Diffusion instances, from month of the given date, that
can be not in the database.
If exclude_saved, exclude all diffusions that are yet in the database.
When a Diffusion is created, it tries to attach the corresponding
episode using a match of episode.date (and takes care of rerun case);
"""
dates = self.dates_of_month(date)
saved = Diffusion.objects.filter(date__in = dates,
program = self.program)
diffusions = []
# existing diffusions
for item in saved:
if item.date in dates:
dates.remove(item.date)
if not exclude_saved:
diffusions.append(item)
# others
for date in dates:
first_date = date
if self.rerun:
first_date -= self.date - self.rerun.date
episode = Episode.objects.filter(date = first_date,
program = self.program)
episode = episode[0] if episode.count() else None
diffusions.append(Diffusion(
episode = episode,
program = self.program,
stream = self.program.stream,
type = Diffusion.Type['unconfirmed'],
date = date,
))
return diffusions
def __str__ (self):
frequency = [ x for x,y in Schedule.Frequency.items()
if y == self.frequency ]
return self.program.name + ': ' + frequency[0] + ' (' + str(self.date) + ')'
class Meta:
verbose_name = _('Schedule')
verbose_name_plural = _('Schedules')
class Diffusion (models.Model):
Type = {
'default': 0x00, # simple diffusion (done/planed)
'unconfirmed': 0x01, # scheduled by the generator but not confirmed for diffusion
'cancel': 0x02, # cancellation happened; used to inform users
# 'restart': 0x03, # manual restart; used to remix/give up antenna
'stop': 0x04, # diffusion has been forced to stop
}
for key, value in Type.items():
ugettext_lazy(key)
episode = models.ForeignKey (
'Episode',
blank = True, null = True,
verbose_name = _('episode'),
)
program = models.ForeignKey (
'Program',
verbose_name = _('program'),
)
# program.stream can change, but not the stream;
stream = models.ForeignKey(
'Stream',
verbose_name = _('stream'),
default = 0,
help_text = 'stream id on which the diffusion happens',
)
type = models.SmallIntegerField(
verbose_name = _('type'),
choices = [ (y, x) for x,y in Type.items() ],
)
date = models.DateTimeField( _('start of the diffusion') )
def save (self, *args, **kwargs):
if self.episode: # FIXME self.episode or kwargs['episode']
self.program = self.episode.program
# check type against stream's type
super(Diffusion, self).save(*args, **kwargs)
def __str__ (self):
return self.program.name + ' on ' + str(self.date) \
+ str(self.type)
class Meta:
verbose_name = _('Diffusion')
verbose_name_plural = _('Diffusions')
class Stream (models.Model):
Type = {
'random': 0x00, # selection using random function
'schedule': 0x01, # selection using schedule
}
for key, value in Type.items():
ugettext_lazy(key)
name = models.CharField(
_('name'),
max_length = 32,
blank = True,
null = True,
)
public = models.BooleanField(
_('public'),
default = True,
help_text = _('program list is public'),
)
type = models.SmallIntegerField(
verbose_name = _('type'),
choices = [ (y, x) for x,y in Type.items() ],
)
priority = models.SmallIntegerField(
_('priority'),
default = 0,
help_text = _('priority of the stream')
)
# get info for:
# - random lists
# - scheduled lists
# link between Streams and Programs:
# - hours range (non-stop)
# - stream/pgm
def __str__ (self):
return '#{} {}'.format(self.priority, self.name)
class Program (Nameable):
stream = models.ForeignKey(
Stream,
verbose_name = _('stream'),
)
active = models.BooleanField(
_('inactive'),
default = True,
help_text = _('if not set this program is no longer active')
)
@property
def path (self):
return os.path.join(settings.AIRCOX_PROGRAMS_DIR,
slugify(self.name + '_' + str(self.id)) )
def find_schedule (self, date):
"""
Return the first schedule that matches a given date
"""
schedules = Schedule.objects.filter(program = self)
for schedule in schedules:
if schedule.match(date, check_time = False):
return schedule
class Episode (Nameable):
program = models.ForeignKey(
Program,
verbose_name = _('program'),
help_text = _('parent program'),
)
sounds = models.ManyToManyField(
Sound,
blank = True,
verbose_name = _('sounds'),
)
class Meta:
verbose_name = _('Episode')
verbose_name_plural = _('Episodes')

24
aircox_programs/settings.py Executable file
View File

@ -0,0 +1,24 @@
import os
from django.conf import settings
def ensure (key, default):
globals()[key] = getattr(settings, key, default)
# Directory for the programs data
ensure('AIRCOX_PROGRAMS_DIR',
os.path.join(settings.MEDIA_ROOT, 'programs'))
# Default directory for the sounds
ensure('AIRCOX_SOUNDFILE_DEFAULT_DIR',
os.path.join(AIRCOX_PROGRAMS_DIR + 'default'))
# Extension of sound files
ensure('AIRCOX_SOUNDFILE_EXT',
('.ogg','.flac','.wav','.mp3','.opus'))
# Stream for the scheduled diffusions
ensure('AIRCOX_SCHEDULED_STREAM', 0)

3
aircox_programs/tests.py Executable file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

6
aircox_programs/utils.py Normal file
View File

@ -0,0 +1,6 @@
def ensure_list (value):
if type(value) in (list, set, tuple):
return value
return [value]

90
aircox_programs/views.py Executable file
View File

@ -0,0 +1,90 @@
from django.db import models
from django.shortcuts import render
from django.core.serializers.json import DjangoJSONEncoder
from django.utils import timezone, dateformat
from django.views.generic import ListView
from django.views.generic import DetailView
from django.utils.translation import ugettext as _, ugettext_lazy
from aircox_programs.models import *
import aircox_programs.settings
import aircox_programs.utils
class ListQueries:
@staticmethod
def search (qs, q):
qs = qs.filter(tags__slug__in = re.compile(r'(\s|\+)+').split(q)) | \
qs.filter(title__icontains = q) | \
qs.filter(subtitle__icontains = q) | \
qs.filter(content__icontains = q)
qs.distinct()
return qs
@staticmethod
def thread (qs, q):
return qs.filter(parent = q)
@staticmethod
def next (qs, q):
qs = qs.filter(date__gte = timezone.now())
if q:
qs = qs.filter(parent = q)
return qs
@staticmethod
def prev (qs, q):
qs = qs.filter(date__lte = timezone.now())
if q:
qs = qs.filter(parent = q)
return qs
@staticmethod
def date (qs, q):
if not q:
q = timezone.datetime.today()
if type(q) is str:
q = timezone.datetime.strptime(q, '%Y/%m/%d').date()
return qs.filter(date__startswith = q)
class Diffusion:
@staticmethod
def episode (qs, q):
return qs.filter(episode = q)
@staticmethod
def program (qs, q):
return qs.filter(program = q)
class ListQuery:
model = None
qs = None
def __init__ (self, model, *kwargs):
self.model = model
self.__dict__.update(kwargs)
def get_queryset (self, by, q):
qs = model.objects.all()
if model._meta.get_field_by_name('public'):
qs = qs.filter(public = True)
# run query set
queries = Queries.__dict__.get(self.model) or Queries
filter = queries.__dict__.get(by)
if filter:
qs = filter(qs, q)
# order
if self.sort == 'asc':
qs = qs.order_by('date', 'id')
else:
qs = qs.order_by('-date', '-id')
# exclude
qs = qs.exclude(id = exclude)
return qs