forked from rc/aircox
#93 Co-authored-by: bkfox <thomas bkfox net> Reviewed-on: rc/aircox#95
This commit is contained in:
72
aircox/tests/conftest.py
Normal file
72
aircox/tests/conftest.py
Normal file
@ -0,0 +1,72 @@
|
||||
from datetime import time, timedelta
|
||||
import itertools
|
||||
|
||||
import pytest
|
||||
from model_bakery import baker
|
||||
|
||||
from aircox import models
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stations():
|
||||
return baker.make("aircox.station", _quantity=2)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def programs(stations):
|
||||
items = list(
|
||||
itertools.chain(
|
||||
*(
|
||||
baker.make("aircox.program", station=station, _quantity=3)
|
||||
for station in stations
|
||||
)
|
||||
)
|
||||
)
|
||||
for item in items:
|
||||
item.save()
|
||||
return items
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sched_initials(programs):
|
||||
# use concrete class; timezone is provided in order to ensure DST
|
||||
items = [
|
||||
baker.prepare(
|
||||
"aircox.schedule",
|
||||
program=program,
|
||||
time=time(16, 00),
|
||||
timezone="Europe/Brussels",
|
||||
)
|
||||
for program in programs
|
||||
]
|
||||
models.Schedule.objects.bulk_create(items)
|
||||
return items
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sched_reruns(sched_initials):
|
||||
# use concrete class
|
||||
items = [
|
||||
baker.prepare(
|
||||
"aircox.schedule",
|
||||
initial=initial,
|
||||
program=initial.program,
|
||||
date=initial.date,
|
||||
time=(initial.start + timedelta(hours=1)).time(),
|
||||
)
|
||||
for initial in sched_initials
|
||||
]
|
||||
models.Schedule.objects.bulk_create(items)
|
||||
return items
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def schedules(sched_initials, sched_reruns):
|
||||
return sched_initials + sched_reruns
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def episodes(programs):
|
||||
return [
|
||||
baker.make("aircox.episode", parent=program) for program in programs
|
||||
]
|
19
aircox/tests/models/test_diffusion.py
Normal file
19
aircox/tests/models/test_diffusion.py
Normal file
@ -0,0 +1,19 @@
|
||||
import pytest
|
||||
|
||||
|
||||
class TestDiffusionQuerySet:
|
||||
@pytest.mark.django_db
|
||||
def test_episode_by_obj(self, episodes):
|
||||
pass
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_episode_by_id(self, episodes):
|
||||
pass
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_on_air(self, episodes):
|
||||
pass
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_now(self, episodes):
|
||||
pass
|
117
aircox/tests/models/test_rerun.py
Normal file
117
aircox/tests/models/test_rerun.py
Normal file
@ -0,0 +1,117 @@
|
||||
from datetime import timedelta
|
||||
|
||||
import pytest
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
# we use Schedule as concrete class (Rerun is abstract)
|
||||
from aircox.models import Schedule
|
||||
|
||||
|
||||
class TestRerunQuerySet:
|
||||
@pytest.mark.django_db
|
||||
def test_station_by_obj(self, stations, schedules):
|
||||
for station in stations:
|
||||
queryset = (
|
||||
Schedule.objects.station(station)
|
||||
.distinct()
|
||||
.values_list("program__station", flat=True)
|
||||
)
|
||||
assert queryset.count() == 1
|
||||
assert queryset.first() == station.pk
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_station_by_id(self, stations, schedules):
|
||||
for station in stations:
|
||||
queryset = (
|
||||
Schedule.objects.station(id=station.pk)
|
||||
.distinct()
|
||||
.values_list("program__station", flat=True)
|
||||
)
|
||||
assert queryset.count() == 1
|
||||
assert queryset.first() == station.pk
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_program_by_obj(self, programs, schedules):
|
||||
for program in programs:
|
||||
queryset = (
|
||||
Schedule.objects.program(program)
|
||||
.distinct()
|
||||
.values_list("program", flat=True)
|
||||
)
|
||||
assert queryset.count() == 1
|
||||
assert queryset.first() == program.pk
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_program_by_id(self, programs, schedules):
|
||||
for program in programs:
|
||||
queryset = (
|
||||
Schedule.objects.program(id=program.pk)
|
||||
.distinct()
|
||||
.values_list("program", flat=True)
|
||||
)
|
||||
assert queryset.count() == 1
|
||||
assert queryset.first() == program.pk
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_rerun(self, schedules):
|
||||
queryset = Schedule.objects.rerun().values_list("initial", flat=True)
|
||||
assert None not in queryset
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_initial(self, schedules):
|
||||
queryset = (
|
||||
Schedule.objects.initial()
|
||||
.distinct()
|
||||
.values_list("initial", flat=True)
|
||||
)
|
||||
assert queryset.count() == 1
|
||||
assert queryset.first() is None
|
||||
|
||||
|
||||
class TestRerun:
|
||||
@pytest.mark.django_db
|
||||
def test_is_initial_true(self, sched_initials):
|
||||
assert all(r.is_initial for r in sched_initials)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_is_initial_false(self, sched_reruns):
|
||||
assert all(not r.is_initial for r in sched_reruns)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_is_rerun_true(self, sched_reruns):
|
||||
assert all(r.is_rerun for r in sched_reruns)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_is_rerun_false(self, sched_initials):
|
||||
assert all(not r.is_rerun for r in sched_initials)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_get_initial_of_initials(self, sched_initials):
|
||||
assert all(r.get_initial() is r for r in sched_initials)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_get_initial_of_reruns(self, sched_reruns):
|
||||
assert all(r.get_initial() is r.initial for r in sched_reruns)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_clean_success(self, sched_reruns):
|
||||
for rerun in sched_reruns:
|
||||
rerun.clean()
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_clean_fails(self, sched_reruns):
|
||||
for rerun in sched_reruns:
|
||||
rerun.time = (rerun.initial.start - timedelta(hours=2)).time()
|
||||
with pytest.raises(ValidationError):
|
||||
rerun.clean()
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_save_rerun(self, sched_reruns):
|
||||
for rerun in sched_reruns:
|
||||
rerun.program = None
|
||||
rerun.save_rerun()
|
||||
assert rerun.program == rerun.initial.program
|
||||
|
||||
# TODO: save()
|
||||
# save_initial is empty, thus not tested
|
135
aircox/tests/models/test_schedule.py
Normal file
135
aircox/tests/models/test_schedule.py
Normal file
@ -0,0 +1,135 @@
|
||||
from datetime import date, datetime, time, timedelta
|
||||
|
||||
import pytest
|
||||
from model_bakery import baker
|
||||
|
||||
import calendar
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
from aircox import utils
|
||||
from aircox.models import Diffusion, Schedule
|
||||
|
||||
|
||||
class TestSchedule:
|
||||
@pytest.mark.django_db
|
||||
def test_save_rerun(self, sched_reruns):
|
||||
for schedule in sched_reruns:
|
||||
schedule.duration = None
|
||||
schedule.frequency = None
|
||||
schedule.save_rerun()
|
||||
assert schedule.program == schedule.initial.program
|
||||
assert schedule.duration == schedule.initial.duration
|
||||
assert schedule.frequency == schedule.initial.frequency
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_tz(self, schedules):
|
||||
for schedule in schedules:
|
||||
assert schedule.timezone == schedule.tz.zone
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_start(self, schedules):
|
||||
for schedule in schedules:
|
||||
assert schedule.start.date() == schedule.date
|
||||
assert schedule.start.time() == schedule.time
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_end(self, schedules):
|
||||
for schedule in schedules:
|
||||
delta = utils.to_timedelta(schedule.duration)
|
||||
assert schedule.end - schedule.start == delta
|
||||
|
||||
# def test_get_frequency_display(self):
|
||||
# pass
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_normalize(self, schedules):
|
||||
for schedule in schedules:
|
||||
dt = datetime.combine(schedule.date, schedule.time)
|
||||
assert schedule.normalize(dt).tzinfo.zone == schedule.timezone
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_dates_of_month_ponctual(self):
|
||||
schedule = baker.prepare(
|
||||
Schedule, frequency=Schedule.Frequency.ponctual
|
||||
)
|
||||
at = schedule.date + relativedelta(months=4)
|
||||
assert schedule.dates_of_month(at) == []
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize("months", range(0, 25, 2))
|
||||
@pytest.mark.parametrize("hour", range(0, 24, 3))
|
||||
def test_dates_of_month_last(self, months, hour):
|
||||
schedule = baker.prepare(
|
||||
Schedule, time=time(hour, 00), frequency=Schedule.Frequency.last
|
||||
)
|
||||
at = schedule.date + relativedelta(months=months)
|
||||
datetimes = schedule.dates_of_month(at)
|
||||
assert len(datetimes) == 1
|
||||
|
||||
dt = datetimes[0]
|
||||
self._assert_date(schedule, at, dt)
|
||||
|
||||
month_info = calendar.monthrange(at.year, at.month)
|
||||
at = date(at.year, at.month, month_info[1])
|
||||
if at.weekday() < schedule.date.weekday():
|
||||
at -= timedelta(days=7)
|
||||
at += timedelta(days=schedule.date.weekday()) - timedelta(
|
||||
days=at.weekday()
|
||||
)
|
||||
assert dt.date() == at
|
||||
|
||||
# since the same method is used for first, second, etc. frequencies
|
||||
# we assume testing every is sufficient
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize("months", range(0, 25, 2))
|
||||
@pytest.mark.parametrize("hour", range(0, 24, 3))
|
||||
def test_dates_of_month_every(self, months, hour):
|
||||
schedule = baker.prepare(
|
||||
Schedule, time=time(hour, 00), frequency=Schedule.Frequency.every
|
||||
)
|
||||
at = schedule.date + relativedelta(months=months)
|
||||
datetimes = schedule.dates_of_month(at)
|
||||
last = None
|
||||
for dt in datetimes:
|
||||
self._assert_date(schedule, at, dt)
|
||||
if last:
|
||||
assert (dt - last).days == 7
|
||||
last = dt
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize("months", range(0, 25, 2))
|
||||
@pytest.mark.parametrize("hour", range(0, 24, 3))
|
||||
def test_dates_of_month_one_on_two(self, months, hour):
|
||||
schedule = baker.prepare(
|
||||
Schedule,
|
||||
time=time(hour, 00),
|
||||
frequency=Schedule.Frequency.one_on_two,
|
||||
)
|
||||
at = schedule.date + relativedelta(months=months)
|
||||
datetimes = schedule.dates_of_month(at)
|
||||
for dt in datetimes:
|
||||
self._assert_date(schedule, at, dt)
|
||||
delta = dt.date() - schedule.date
|
||||
assert delta.days % 14 == 0
|
||||
|
||||
def _assert_date(self, schedule, at, dt):
|
||||
assert dt.year == at.year
|
||||
assert dt.month == at.month
|
||||
assert dt.weekday() == schedule.date.weekday()
|
||||
assert dt.time() == schedule.time
|
||||
assert dt.tzinfo.zone == schedule.timezone
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_diffusions_of_month(self, sched_initials):
|
||||
# TODO: test values of initial, rerun
|
||||
for schedule in sched_initials:
|
||||
at = schedule.date + timedelta(days=30)
|
||||
dates = set(schedule.dates_of_month(at))
|
||||
episodes, diffusions = schedule.diffusions_of_month(at)
|
||||
|
||||
assert all(r.date in dates for r in episodes)
|
||||
assert all(
|
||||
(not r.initial or r.date in dates)
|
||||
and r.type == Diffusion.TYPE_ON_AIR
|
||||
for r in diffusions
|
||||
)
|
@ -1,66 +0,0 @@
|
||||
import calendar
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from django.test import TestCase
|
||||
from django.utils import timezone as tz
|
||||
|
||||
from aircox.models import Schedule
|
||||
|
||||
logger = logging.getLogger("aircox.test")
|
||||
logger.setLevel("INFO")
|
||||
|
||||
|
||||
class ScheduleCheck(TestCase):
|
||||
def setUp(self):
|
||||
self.schedules = [
|
||||
Schedule(
|
||||
date=tz.now(),
|
||||
duration=datetime.time(1, 30),
|
||||
frequency=frequency,
|
||||
)
|
||||
for frequency in Schedule.Frequency.__members__.values()
|
||||
]
|
||||
|
||||
def test_frequencies(self):
|
||||
for schedule in self.schedules:
|
||||
logger.info(
|
||||
"- test frequency %s" % schedule.get_frequency_display()
|
||||
)
|
||||
date = schedule.date
|
||||
count = 24
|
||||
while count:
|
||||
logger.info(
|
||||
"- month %(month)s/%(year)s"
|
||||
% {"month": date.month, "year": date.year}
|
||||
)
|
||||
count -= 1
|
||||
dates = schedule.dates_of_month(date)
|
||||
if schedule.frequency == schedule.Frequency.one_on_two:
|
||||
self.check_one_on_two(schedule, date, dates)
|
||||
elif schedule.frequency == schedule.Frequency.last:
|
||||
self.check_last(schedule, date, dates)
|
||||
else:
|
||||
pass
|
||||
date += relativedelta(months=1)
|
||||
|
||||
def check_one_on_two(self, schedule, date, dates):
|
||||
for date in dates:
|
||||
delta = date.date() - schedule.date.date()
|
||||
self.assertEqual(delta.days % 14, 0)
|
||||
|
||||
def check_last(self, schedule, date, dates):
|
||||
month_info = calendar.monthrange(date.year, date.month)
|
||||
date = datetime.date(date.year, date.month, month_info[1])
|
||||
|
||||
# end of month before the wanted weekday: move one week back
|
||||
if date.weekday() < schedule.date.weekday():
|
||||
date -= datetime.timedelta(days=7)
|
||||
|
||||
date -= datetime.timedelta(days=date.weekday())
|
||||
date += datetime.timedelta(days=schedule.date.weekday())
|
||||
self.assertEqual(date, dates[0].date())
|
||||
|
||||
def check_n_of_week(self, schedule, date, dates):
|
||||
pass
|
Reference in New Issue
Block a user