forked from rc/aircox
tests: playlist_import, log_archiver, sound_stats.SoxStats
This commit is contained in:
@ -1,37 +1,110 @@
|
||||
import pytest
|
||||
from django.utils import timezone as tz
|
||||
|
||||
from aircox.controllers.log_archiver import LogArchiver
|
||||
import pytest
|
||||
from model_bakery import baker
|
||||
|
||||
from aircox import models
|
||||
from aircox.test import Interface, File
|
||||
from aircox.controllers import log_archiver
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def log_archiver():
|
||||
return LogArchiver()
|
||||
def diffusions(episodes):
|
||||
items = [
|
||||
baker.prepare(
|
||||
models.Diffusion,
|
||||
program=episode.program,
|
||||
episode=episode,
|
||||
type=models.Diffusion.TYPE_ON_AIR,
|
||||
)
|
||||
for episode in episodes
|
||||
]
|
||||
models.Diffusion.objects.bulk_create(items)
|
||||
return items
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def logs(diffusions, sound, tracks):
|
||||
now = tz.now()
|
||||
station = diffusions[0].program.station
|
||||
items = [
|
||||
models.Log(
|
||||
station=diffusion.program.station,
|
||||
type=models.Log.TYPE_START,
|
||||
date=now + tz.timedelta(hours=-10, minutes=i),
|
||||
source="13",
|
||||
diffusion=diffusion,
|
||||
)
|
||||
for i, diffusion in enumerate(diffusions)
|
||||
]
|
||||
items += [
|
||||
models.Log(
|
||||
station=station,
|
||||
type=models.Log.TYPE_ON_AIR,
|
||||
date=now + tz.timedelta(hours=-9, minutes=i),
|
||||
source="14",
|
||||
track=track,
|
||||
sound=track.sound,
|
||||
)
|
||||
for i, track in enumerate(tracks)
|
||||
]
|
||||
models.Log.objects.bulk_create(items)
|
||||
return items
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def logs_qs(logs):
|
||||
return models.Log.objects.filter(pk__in=(r.pk for r in logs))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def file():
|
||||
return File(data=b"")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gzip(file):
|
||||
gzip = Interface.inject(log_archiver, "gzip", {"open": file})
|
||||
yield gzip
|
||||
gzip._irelease()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def archiver():
|
||||
return log_archiver.LogArchiver()
|
||||
|
||||
|
||||
class TestLogArchiver:
|
||||
def test_get_path(self):
|
||||
pass
|
||||
@pytest.mark.django_db
|
||||
def test_archive_then_load_file(self, archiver, file, gzip, logs, logs_qs):
|
||||
# before logs are deleted from db, get data
|
||||
sorted = archiver.sort_logs(logs_qs)
|
||||
paths = {
|
||||
archiver.get_path(station, date) for station, date in sorted.keys()
|
||||
}
|
||||
|
||||
def test_archive(self):
|
||||
pass
|
||||
count = archiver.archive(logs_qs, keep=False)
|
||||
assert count == len(logs)
|
||||
assert not logs_qs.count()
|
||||
assert all(
|
||||
path in paths for path, *_ in gzip._traces("open", args=True)
|
||||
)
|
||||
|
||||
def test_archive_no_qs(self):
|
||||
pass
|
||||
results = archiver.load_file("dummy path")
|
||||
assert results
|
||||
|
||||
def test_archive_not_keep(self):
|
||||
pass
|
||||
@pytest.mark.django_db
|
||||
def test_archive_no_qs(self, archiver):
|
||||
count = archiver.archive(models.Log.objects.none())
|
||||
assert not count
|
||||
|
||||
def test_sort_log(self):
|
||||
pass
|
||||
@pytest.mark.django_db
|
||||
def test_sort_log(self, archiver, logs_qs):
|
||||
sorted = archiver.sort_logs(logs_qs)
|
||||
|
||||
def test_serialize(self):
|
||||
pass
|
||||
|
||||
def test_load(self):
|
||||
pass
|
||||
|
||||
def test_load_file_not_exists(self):
|
||||
pass
|
||||
|
||||
def test_get_relations(self):
|
||||
pass
|
||||
assert sorted
|
||||
for (station, date), logs in sorted.items():
|
||||
assert all(
|
||||
log.station == station and log.date.date() == date
|
||||
for log in logs
|
||||
)
|
||||
|
Reference in New Issue
Block a user