import requests from flask import Blueprint, render_template, request, flash, g, url_for, redirect from flask_login import current_user from werkzeug.exceptions import BadRequest, NotFound from ..common.common import * from ..common.innertube import * from .lib import * from .protobuf import make_sp, make_channel_params, make_playlist_params, Filters frontend = Blueprint('browse', __name__, template_folder='templates', static_folder='static', static_url_path='/static/ys') @frontend.route('/results') @frontend.route('/search') def search(): #token = getattr(current_user, 'token', 'guest') q = request.args.get('q') or request.args.get('search_query') page = int(request.args.get('page') or 1) sp = make_sp(page, **{ k:v for k,v in request.args.items() if k in ['sort','date','type','len'] }, features=[ f for f in request.args.getlist('feature') if f in Filters.__dataclass_fields__.keys() ], extras=[ e for e in request.args.getlist('feature') if e in ['verbatim'] ]) if q: yt_results = fetch_searchresults(q, sp) results, extras = prepare_searchresults(yt_results) for extra in extras: flash(extra, 'info') else: results = None return render_template('search.html.j2', rows=results, query=q, page=page) @frontend.route('/channel//') @frontend.route('/channel//') def channel(channel_id, subpage="videos"): token = getattr(current_user, 'token', 'guest') if subpage == "videos": page = int(request.args.get('page') or 1) sort_by = request.args.get('sort') or "newest" query = None elif subpage == "playlists": page = int(request.args.get('page') or 1) sort_by = request.args.get('sort', "modified") query = None elif subpage == "search": query = request.args.get('q') page = int(request.args.get('page') or 1) sort_by = None else: # we don't support /home, /about, ..., so redirect to /videos. return redirect(url_for('.channel', channel_id=channel_id)) # best effort; if it fails, it fails in the redirect. if not re.match(r"(UC[A-Za-z0-9_-]{22})", channel_id): return redirect(url_for('.channel_redirect', user=channel_id)) result = fetch_ajax(make_channel_params(channel_id, subpage, page, sort_by, query, v3=(subpage != "search"))) title, descr, thumb, rows, more = prepare_channel(result, channel_id) if title is None or not rows: # if fetching from innertube failed, fall back to xmlfeed: flash("unable to fetch results from ajax; displaying fallback results (15 newest)", "error") return fallback_route(channel_id, subpage) # set pin/hide stati of retrieved videos: video_ids = [card['content']['video_id'] for card in rows] pinned, hidden = fetch_video_flags(token, video_ids) rows = sorted([ {'type':v['type'], 'content':{**v['content'], 'pinned': v['content']['video_id'] in pinned}} for v in rows if v['content']['video_id'] not in hidden ], key=lambda v:v['content']['pinned'], reverse=True) with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT COUNT(*) FROM subscriptions WHERE channel_id = ? AND user = ? """, (channel_id, token)) (is_subscribed,) = c.fetchone() return render_template('channel.html.j2', title=title, subpage=subpage, rows=rows, channel_id=channel_id, channel_img=thumb, channel_desc=descr, is_subscribed=is_subscribed, page=page, has_more=more) @frontend.route('/user//') @frontend.route('/user//') @frontend.route('/c//') @frontend.route('/c//') def channel_redirect(user, subpage=None): """ The browse_ajax 'API' needs the UCID. """ # inverse of the test in /channel/: if re.match(r"(UC[A-Za-z0-9_-]{22})", user): return redirect(url_for('.channel', channel_id=user)) channel_id = canonicalize_channel(user) if not channel_id: raise NotFound("channel appears to not exist") return redirect( url_for('.channel', channel_id=channel_id, subpage=subpage), 308 ) @frontend.route('/playlist') def playlist(): #TODO: if anything goes wrong, fall back to xmlfeed playlist_id = request.args.get('list') if not playlist_id: raise BadRequest("No playlist ID") page = int(request.args.get('page', 1)) xmlfeed = fetch_xml("playlist_id", playlist_id) if not xmlfeed: raise NotFound("Unable to fetch playlist") title, author, _, channel_id, _ = parse_xml(xmlfeed) offset = (page-1)*100 # each call returns 100 items result = fetch_ajax(make_playlist_params(playlist_id, offset)) if not 'continuationContents' in result[1]['response']: # XXX: this needs cleanup! # code:"CONDITION_NOT_MET", debugInfo:"list type not viewable" # on playlist (not openable on error = result[1]['response']['responseContext']['errors']['error'][0] flash(f"{error['code']}: {error['debugInfo'] or error['externalErrorMessage']}", 'error') return fallback_route() rows, more = prepare_playlist(result) return render_template('playlist.html.j2', title=title, author=author, channel_id=channel_id, rows=rows, page=page, has_more=more) @frontend.route('/', strict_slashes=False) def plain_user_or_video(something): # this is a near-copy of the same route in app/youtube, but using a # different, more reliable endpoint to determine whether a channel exists. if '.' in something: # prevent a lot of false-positives (and reduce youtube api calls) raise NotFound channel_id = canonicalize_channel(something) if channel_id: return redirect(url_for('.channel', channel_id=channel_id)) elif re.match(r"^[-_0-9A-Za-z]{11}$", something): # looks like a video id return redirect(url_for('', v=something, t=request.args.get('t'))) else: # ¯\_(ツ)_/¯ raise NotFound("Note: some usernames not recognized; try searching it") @frontend.before_app_request def inject_button(): if not 'header_items' in g: g.header_items = [] g.header_items.append({ 'name': 'search', 'url': url_for(''), 'parent':, 'priority': 15, })