Improve python code quality

This commit is contained in:
Michal Szczepanski 2020-03-14 12:07:22 +01:00
parent 9ff9591b71
commit 39049beba7
9 changed files with 56 additions and 35 deletions

@ -8,16 +8,16 @@ import tornado.web
from playlistcast import db, error
class BrowseResourceHandler(tornado.web.StaticFileHandler):
class BrowseResourceHandler(tornado.web.StaticFileHandler): # pylint: disable=W0223
"""Serve attached resources"""
# pylint: disable=W0223
def initialize(self):
def initialize(self): # pylint: disable=W0221
"""Override hack"""
self.root = ''
self.root = '' # pylint: disable=W0201
def set_etag_header(self):
"""Override hack"""
pass
pass # pylint: disable=W0107
@classmethod
def get_absolute_path(cls, root: str, path: str):
@ -28,15 +28,16 @@ class BrowseResourceHandler(tornado.web.StaticFileHandler):
"""Override hack"""
return absolute_path
async def get(self, *args, **kwargs):
async def get(self, path: str, include_body: bool = True):
"""Get request"""
a = args[0].split('/')
model = db.session.query(db.ResourceLocation).filter(db.ResourceLocation.name == a[0]).first()
a = path.split('/')
model = db.session.query(db.ResourceLocation)\
.filter(db.ResourceLocation.name == a[0]).first()
if not model:
raise error.ResourcePathError('Invalid path {}'.format(args[0]))
raise error.ResourcePathError('Invalid path {}'.format(path))
uri = '/resource/'
if len(a) > 0:
uri += args[0]
uri += path
path = os.path.join(model.location, '/'.join(a[1:]))
else:
path = model.location

@ -2,9 +2,9 @@
# -*- coding: utf-8 -*-
""""Device model"""
import asyncio
from typing import Dict
import graphene
import pychromecast
from typing import Dict
from pychromecast.controllers import media
from graphql.execution.base import ResolveInfo
from playlistcast import error, util
@ -182,7 +182,7 @@ class ChromecastSubtitleEnable(graphene.Mutation):
def mutate(self,
info: ResolveInfo,
uid: graphene.String,
track_id:graphene.Int) -> graphene.Boolean:
track_id: graphene.Int) -> graphene.Boolean:
"""Method to change subtitle track"""
if uid not in CHROMECAST:
raise error.ChromecastUUIDError(uid=uid)
@ -207,8 +207,8 @@ class ChromecastSubtitleDisable(graphene.Mutation):
data = CHROMECAST[uid]
data.device.media_controller.disable_subtitle()
return True
# HELPER CLASSES
# HELPER CLASSES
class ChromecastDevice:
"""Device with api and interface data"""
def __init__(self, device: pychromecast.Chromecast, data: ChromecastModel):
@ -226,7 +226,7 @@ class ChromecastDevice:
def new_media_status(self, status: media.MediaStatus):
"""Subscribe for chromecast status messages"""
s = util.convert(status, MediaStatus, ('uuid','subtitle_tracks'))
s = util.convert(status, MediaStatus, ('uuid', 'subtitle_tracks'))
subtitle_tracks = list()
for tr in status.subtitle_tracks:
if 'type' in tr and tr['type'] == 'TEXT':

@ -1,13 +1,15 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import graphene
"""Playlist api"""
from typing import Dict
import graphene
from graphql.execution.base import ResolveInfo
from .chromecast import CHROMECAST, ChromecastDevice
from playlistcast import db
from playlistcast import db, error
from playlistcast.protocol import m3u
from .chromecast import CHROMECAST, ChromecastDevice
class PlaylistItem(graphene.ObjectType):
"""Single playlist item"""
index = graphene.Int()
name = graphene.String()
path = graphene.String()
@ -31,14 +33,15 @@ class PlaylistPlay(graphene.Mutation):
def mutate(self, info: ResolveInfo, options: PlaylistPlayOptions) -> graphene.Boolean: # pylint: disable=W0622
"""Method to play playlist on chromecast"""
model = db.session.query(db.ResourceLocation).filter(db.ResourceLocation.name == options.name).first()
model = db.session.query(db.ResourceLocation)\
.filter(db.ResourceLocation.name == options.name).first()
if not model:
raise error.ResourcePathError('Invalid path {}'.format(options.name))
if options.uid not in CHROMECAST:
raise error.ChromecastUUIDError(uid)
raise error.ChromecastUUIDError(options.uid)
# create new playlist or use existing one
if PLAYLIST[uid] is not None:
playlist = PLAYLIST[uid]
if PLAYLIST[options.uid] is not None:
playlist = PLAYLIST[options.uid]
p = playlist.playlist
else:
p = m3u.M3UPlaylist()
@ -52,11 +55,14 @@ class PlaylistPlay(graphene.Mutation):
mc.stop()
if options.index:
p.set_index(options.index)
urlpath = 'http://'+util.get_ip()+':'+str(config.PORT)+'/resource/'+options.name+'/'+str(m3udir)+'/'+p.current_item.path
# urlpath = f'http://{util.get_ip()}/{config.PORT}/resource/'
# urlpath += f'{options.name}/{m3udir}/{p.current_item.path}'
playlist.device.media_controller.play_media(p.current_item.path)
return True
# HELPER
class Playlist:
"""Playlist with api and interface data"""
def __init__(self, playlist: m3u.M3UPlaylist, chromecast: ChromecastDevice):
self.playlist = playlist
self.chromecast = chromecast

@ -9,12 +9,14 @@ from playlistcast import db
from .subscription import SubscriptionModel
class File(graphene.ObjectType):
"""File representation"""
name = graphene.String()
size = graphene.String()
suffix = graphene.String()
is_dir = graphene.Boolean()
class Directory(graphene.ObjectType):
"""Directory representation"""
resource_name = graphene.String()
resource_path = graphene.String()
subpath = graphene.String()

@ -7,7 +7,7 @@ from typing import List
import graphene
from graphql_relay import from_global_id
from graphql.execution.base import ResolveInfo
from playlistcast import util, db, config
from playlistcast import util, db, config, error
from playlistcast.protocol import m3u
from .model.resource_location import ResourceLocation, Directory, File
from .model.chromecast import ChromecastModel, CastStatus, CHROMECAST
@ -23,11 +23,19 @@ class Query(graphene.ObjectType):
resource_location_all = graphene.List(ResourceLocation)
resource_location = graphene.Field(ResourceLocation, id=graphene.ID(required=True))
list_directory = graphene.Field(Directory, name=graphene.String(required=True), subpath=graphene.String())
list_directory = graphene.Field(
Directory,
name=graphene.String(required=True),
subpath=graphene.String()
)
chromecast_device_all = graphene.List(ChromecastModel)
playlist_items = graphene.Field(graphene.List(PlaylistItem), name=graphene.String(required=True), path=graphene.String(required=True))
playlist_items = graphene.Field(
graphene.List(PlaylistItem),
name=graphene.String(required=True),
path=graphene.String(required=True)
)
def resolve_resource_location_all(self, info: ResolveInfo) -> List[ResourceLocation]:
"""Return ResourceLocation list"""
@ -39,6 +47,10 @@ class Query(graphene.ObjectType):
return ResourceLocation.get_node(info, id)
def resolve_list_directory(self, info: ResolveInfo, name: graphene.String, subpath: graphene.String = '') -> Directory:
""" Browse directories
name - ResourceLocation -> name
subpath - string path of current directory
"""
model = db.session.query(db.ResourceLocation).filter(db.ResourceLocation.name == name).first()
if not model:
raise error.ResourcePathError('Invalid path {}'.format(name))
@ -71,7 +83,8 @@ class Query(graphene.ObjectType):
output.append(val.data)
return output
def resolve_playlist_items(self, info: ResolveInfo, name: graphene.String, path:graphene.String) -> List[PlaylistItem]:
def resolve_playlist_items(self, info: ResolveInfo, name: graphene.String, path: graphene.String) -> List[PlaylistItem]:
"""Get list of playlist items"""
model = db.session.query(db.ResourceLocation).filter(db.ResourceLocation.name == name).first()
if not model:
raise error.ResourcePathError('Invalid path {}'.format(name))
@ -83,4 +96,3 @@ class Query(graphene.ObjectType):
item = PlaylistItem(index=p.index, name=p.name, path=urlpath)
output.append(item)
return output

@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Store static configuration parameters"""
DEBUG = False
PORT = 9666

@ -1,12 +1,11 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Chromecast"""
import time
import logging
from typing import List
from datetime import timedelta
import pychromecast
import pychromecast.controllers.media as chromecast_media
import pychromecast.controllers.media as chromecast_media # pylint: disable=W0611
import playlistcast.api.model.chromecast as chromecast_model
from playlistcast import util
from playlistcast.api.subscription import SubscriptionModel
@ -66,7 +65,7 @@ async def list_devices() -> List[chromecast_model.ChromecastModel]:
mc.uuid = uid
# pychromecast media_controller media_status
st = pych.media_controller.status
ms = util.convert(st, chromecast_model.MediaStatus, ('uuid','subtitle_tracks'))
ms = util.convert(st, chromecast_model.MediaStatus, ('uuid', 'subtitle_tracks'))
ms.uuid = uid
# media_status subtitles
subtitle_tracks = list()

@ -4,14 +4,13 @@
import os
import pathlib
import logging
from typing import List, Dict, Any
import requests
from typing import List
LOG = logging.getLogger('playlistcast.protocol.m3u')
class PlaylistItem:
"""Playlist item"""
def __init__(self, index:int, path: str = '', name: str = ''):
def __init__(self, index: int, path: str = '', name: str = ''):
self._index = index
self._path = path
self._name = name
@ -47,6 +46,7 @@ class M3UPlaylist:
@property
def current_item(self) -> PlaylistItem:
"""Get playlist current item from list based on index"""
item = self.items[self._index - 1]
return item
@ -109,7 +109,7 @@ class M3UPlaylist:
i += 1
return out
def _load_file(self, location:str, path: str) -> (str, str):
def _load_file(self, location: str, path: str) -> (str, str):
"""Load m3u file from disk"""
# check path
fpath = os.path.join(location, path)

@ -49,7 +49,7 @@ def convert(src, dest, ignore=None):
for attr in out.__dict__:
if ignore is not None and attr in ignore:
continue
value = accessor (src, attr)
value = accessor(src, attr)
setattr(out, attr, value)
return out