# this is an alternative to proxying through invidious. the search endpoint has only been (loosely) tested by #17:50 < perflyst[m]> appears to be working #, so i hopeā„¢ this works. if not, that's why it's in the 'dangerous' blueprint import requests from flask import Blueprint, render_template, request, flash, g, url_for from ..common.common import * from ..common.innertube import * from .lib import * from .protobuf import make_sp, make_channel_params, make_playlist_params frontend = Blueprint('dangerous', __name__, template_folder='templates', static_folder='static', static_url_path='/static/ys') @frontend.route('/search') def search(): #token = getattr(current_user, 'token', 'guest') q = request.args.get('q') page = int(request.args.get('page', 1)) sp = make_sp(**{ k:v for k,v in request.args.items() if k in ['sort','date','type','len'] }, extras=['dont_fix_spelling']*0) # extras disabled if q: yt_results = fetch_searchresults(q, page, sp) results, extras = prepare_searchresults(yt_results) for extra in extras: flash(extra, 'info') else: results = None return render_template('search.html.j2', rows=results, query=q, page=page) # TODO: channels, playlists: # https://github.com/iv-org/invidious/blob/452d1e8307d6344dd51c5437ccd032a566291c34/src/invidious/channels.cr#L399 @frontend.route('/channel//') @frontend.route('/channel//') def channel(channel_id, subpage="videos"): #TODO: if anything goes wrong, fall back to xmlfeed if subpage == "videos": page = int(request.args.get('page', 1)) sort_by = request.args.get('sort', "newest") elif subpage == "playlists": page = None # TODO: cursor sort_by = request.args.get('sort', "modified") else: return "not found", 404 result = fetch_ajax(make_channel_params(channel_id, subpage, page, sort_by)) title, descr, thumb, rows = prepare_channel_items(result, channel_id) # TODO: add is_pinned/is_hidden return render_template('channel.html.j2', title=title, subpage=subpage, rows=rows, channel_id=channel_id, channel_img=thumb, channel_desc=descr, page=page) ############ # TODO: this belongs in common.innertube def prepare_channel_items(result, channel_id): response = listfind(result,'response') meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{}) meta2 = response.get('microformat',{}).get('microformatDataRenderer',{}) title = meta1.get('title', meta2.get('title')) descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px if 'continuationContents' in response.keys(): contents = response['continuationContents'] try: # TODO: cleanup items = parse_channel_items(contents['gridContinuation']['items'], channel_id, title) except: try: items = parse_channel_items(contents['sectionListContinuation']['contents'], channel_id, title) except: from flask import current_app current_app.logger.error(result) items = [] else: # if absent, we reach end of list items = [] return title, descr, thumb, items def parse_channel_items(items, channel_id, author): result = [] for item in items: key = next(iter(item.keys()), None) content = item[key] if key == "gridVideoRenderer" or key == "videoRenderer": result.append({'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': content['title']['simpleText'], 'author': author, 'channel_id': channel_id, 'length': listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer').get('text',{}).get('simpleText'), 'views': toInt(content.get('viewCountText',{}).get('simpleText')), 'published': age(content.get('publishedTimeText',{}).get('simpleText')), }}) elif key == "gridPlaylistRenderer" or key == "playlistRenderer": result.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'], 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'], 'title': (content['title'].get('simpleText') or # playlistRenderer content['title']['runs'][0]['text']), # gridPlaylistRenderer 'author': author, 'channel_id': channel_id, 'n_videos': toInt(content.get('videoCount') or # playlistRenderer content.get('videoCountShortText',{}).get('simpleText') or # grid(1) content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2) }}) elif key == "itemSectionRenderer": result.extend(parse_channel_items(content['contents'], channel_id, author)) else: raise Exception(item) # XXX TODO return result def prepare_playlist(result): contents = listfind(result,'response')['continuationContents']['playlistVideoListContinuation'] \ .get('contents',[]) # no .contents if overran end of playlist return list(filter(None, map(parse_playlist, contents))) def parse_playlist(item): key = next(iter(item.keys()), None) content = item[key] if key == "playlistVideoRenderer": if not content.get('isPlayable', False): return None # private or deleted video return {'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': (content['title'].get('simpleText') or # playable videos content['title'].get('runs',[{}])[0].get('text')), # "[Private video]" 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'], 'index': content['navigationEndpoint']['watchEndpoint']['index'], #or int(content['index']['simpleText']) # rest is missing from unplayable videos: 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'), 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'), 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51" int(content.get("lengthSeconds", 0))), # "531" 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'), }} else: raise Exception(item) # XXX TODO ############ @frontend.route('/playlist') def playlist(): #TODO: if anything goes wrong, fall back to xmlfeed playlist_id = request.args.get('list') if not playlist_id: return "bad list id", 400 # todo page = int(request.args.get('page', 1)) offset = (page-1)*100 # each call returns 100 items result = fetch_ajax(make_playlist_params(playlist_id, offset)) rows = prepare_playlist(result) return render_template('playlist.html.j2', title="playlist", # XXX: can't get playlist metadata from this, get from xmlfeed! rows=rows, page=page) @frontend.before_app_request def inject_button(): if not 'header_items' in g: g.header_items = [] g.header_items.append({ 'name': 'search', 'url': url_for('dangerous.search'), 'parent': frontend.name, 'priority': 15, })