from django.http import Http404 from django.shortcuts import get_object_or_404 from rest_framework import viewsets from rest_framework.decorators import action from rest_framework.exceptions import ValidationError from rest_framework.permissions import IsAdminUser from rest_framework.response import Response from aircox.models import Sound from . import controllers from .serializers import ( PlaylistSerializer, QueueSourceSerializer, RequestSerializer, SourceSerializer, StreamerSerializer, ) __all__ = [ "ControllerViewSet", "RequestViewSet", "StreamerViewSet", "SourceViewSet", "PlaylistSourceViewSet", "QueueSourceViewSet", ] class ControllerViewSet(viewsets.ViewSet): permission_classes = (IsAdminUser,) serializer_class = None streamers = controllers.streamers """Streamers controller instance.""" streamer = None """User's Streamer instance.""" object = None """Object to serialize.""" def get_streamer(self, station_pk=None): """Get user's streamer.""" if station_pk is None: station_pk = self.request.station.pk self.streamers.fetch() if station_pk is None: return None if station_pk not in self.streamers: raise Http404(f"station not found: {station_pk}") return self.streamers[station_pk] def get_serializer(self, **kwargs): """Get serializer instance.""" return self.serializer_class(self.object, **kwargs) def serialize(self, obj, **kwargs): """Serializer controller data.""" self.object = obj serializer = self.get_serializer(**kwargs) return serializer.data def dispatch(self, request, *args, station_pk=None, **kwargs): if not self.streamer: self.streamer = self.get_streamer(station_pk) return super().dispatch(request, *args, **kwargs) class RequestViewSet(ControllerViewSet): serializer_class = RequestSerializer class StreamerViewSet(ControllerViewSet): serializer_class = StreamerSerializer def retrieve(self, request, pk=None): return Response(self.serialize(self.streamer)) def list(self, request, pk=None): return Response({"results": self.serialize(self.streamers.values(), many=True)}) def dispatch(self, request, *args, pk=None, **kwargs): if pk is not None: kwargs.setdefault("station_pk", pk) if pk := kwargs.get("station_pk"): kwargs["station_pk"] = int(pk) self.streamer = self.get_streamer(**kwargs) self.object = self.streamer return super().dispatch(request, *args, **kwargs) class SourceViewSet(ControllerViewSet): serializer_class = SourceSerializer model = controllers.Source lookup_field = "pk" lookup_value_converter = "str" def get_sources(self): return (s for s in self.streamer.sources if isinstance(s, self.model)) def get_source(self, pk): source = next((source for source in self.get_sources() if source.id == pk), None) if source is None: raise Http404("source `%s` not found" % pk) return source def retrieve(self, request, pk=None): source = self.get_source(pk) return Response(self.serialize(source)) def list(self, request): return Response({"results": self.serialize(self.get_sources(), many=True)}) def _run(self, pk, action): source = self.object = self.get_source(pk) action(source) source.fetch() return Response(self.serialize(source)) @action(detail=True, methods=["POST"]) def sync(self, request, pk): return self._run(pk, lambda s: s.sync()) @action(detail=True, methods=["POST"]) def skip(self, request, pk): return self._run(pk, lambda s: s.skip()) @action(detail=True, methods=["POST"]) def restart(self, request, pk): return self._run(pk, lambda s: s.restart()) @action(detail=True, methods=["POST"]) def seek(self, request, pk): count = request.POST["seek"] return self._run(pk, lambda s: s.seek(count)) class PlaylistSourceViewSet(SourceViewSet): serializer_class = PlaylistSerializer model = controllers.PlaylistSource class QueueSourceViewSet(SourceViewSet): serializer_class = QueueSourceSerializer model = controllers.QueueSource def get_sound_queryset(self, request): return Sound.objects.station(request.station) @action(detail=True, methods=["POST"]) def push(self, request, pk): if not request.data.get("sound_id"): raise ValidationError('missing "sound_id" POST data') sound = get_object_or_404(self.get_sound_queryset(request), pk=request.data["sound_id"]) return self._run(pk, lambda s: s.push(sound.file.path) if sound.file.path else None)