From 4d6ac61835207c3922d85870d9ce58a8c7051d5c Mon Sep 17 00:00:00 2001 From: girst Date: Sat, 29 Apr 2023 13:59:58 +0000 Subject: [PATCH] split common.innertube into youtube.cards and browse.innertube a few things that were used in both places (G, mkthumbs, log_unknown_card) now live in common.common. --- app/browse/__init__.py | 3 +- app/{common => browse}/innertube.py | 155 +--------------------------- app/browse/lib.py | 2 +- app/common/common.py | 38 ++++++- app/youtube/cards.py | 121 ++++++++++++++++++++++ app/youtube/lib.py | 3 +- 6 files changed, 161 insertions(+), 161 deletions(-) rename app/{common => browse}/innertube.py (70%) create mode 100644 app/youtube/cards.py diff --git a/app/browse/__init__.py b/app/browse/__init__.py index 4673dd6..af06fee 100644 --- a/app/browse/__init__.py +++ b/app/browse/__init__.py @@ -1,11 +1,12 @@ +import re import requests from flask import Blueprint, render_template, request, flash, g, url_for, redirect from flask_login import current_user from werkzeug.exceptions import BadRequest, NotFound from ..common.common import * -from ..common.innertube import * from .lib import * +from .innertube import prepare_searchresults, prepare_channel, prepare_playlist from .protobuf import make_sp, make_channel_params, make_playlist_params, Filters frontend = Blueprint('browse', __name__, diff --git a/app/common/innertube.py b/app/browse/innertube.py similarity index 70% rename from app/common/innertube.py rename to app/browse/innertube.py index 49d53ae..b47b6a6 100644 --- a/app/common/innertube.py +++ b/app/browse/innertube.py @@ -1,27 +1,7 @@ # functions that deal with parsing data from youtube's internal API ("innertube") -from urllib.parse import parse_qs, urlparse -import re +from ..common.common import mkthumbs, log_unknown_card, G -class G: - """ - null-coalescing version of dict.get() that also works on lists. - - the | operator is overloaded to achieve similar looking code to jq(1) filters. - the first found key is used: dict(foo=1)|G('bar','foo') returns 1. - """ - def __init__(self, *keys): - self.keys = keys - def __ror__(self, other): - for key in self.keys: - try: return other[key] - except: continue - return None - class _Text: - """ parses youtube's .runs[].text and .simpleText variants """ - def __ror__(self, other): # Note: only returning runs[0], not concat'ing all! - return other|G('simpleText') or other|G('runs')|G(0)|G('text') - text = _Text() class Select: """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """ def __init__(self, key=None, *, all=None): @@ -66,14 +46,6 @@ def prepare_searchresults(yt_results): return items, extra, more -def prepare_infocards(metadata): - cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[]) - return list(filter(None, map(parse_infocard, cards))) - -def prepare_endcards(metadata): - endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[]) - return list(filter(None, map(parse_endcard, endsc))) - def prepare_channel(response, channel_id, channel_name): meta1 = response|G('metadata')|G('channelMetadataRenderer') meta2 = response|G('microformat')|G('microformatDataRenderer') @@ -132,27 +104,6 @@ def prepare_playlist(result): return title, author, channel_id, list(filter(None, map(parse_playlist, unparsed))), more -def mkthumbs(thumbs): - output = {str(e['height']): e['url'] for e in thumbs} - largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None) - return {**output, 'largest': largest} - -def clean_url(url): - # externals URLs are redirected through youtube.com/redirect, but we - # may encounter internal URLs, too - return parse_qs(urlparse(url).query).get('q',[url])[0] - -def toInt(s, fallback=0): - if s is None: - return fallback - try: - return int(''.join(filter(str.isdigit, s))) - except ValueError: - return fallback - -# Remove left-/rightmost word from string: -delL = lambda s: s.partition(' ')[2] - def age(s): if s is None: # missing from autogen'd music, some livestreams return None @@ -167,16 +118,6 @@ def age(s): return f"{value}{suffix}" -def log_unknown_card(data): - import json - try: - from flask import request - source = request.url - except: source = "unknown" - with open("/tmp/innertube.err", "a", encoding="utf-8", errors="backslashreplace") as f: - f.write(f"\n/***** {source} *****/\n") - json.dump(data, f, indent=2) - def parse_result_items(items): # TODO: use .get() for most non-essential attributes """ @@ -264,100 +205,6 @@ def parse_result_items(items): log_unknown_card(item) return results, extras -def parse_infocard(card): - """ - parses a single infocard into a format that's easier to handle. - """ - card = card['cardRenderer'] - if not 'content' in card: - return None # probably the "View corrections" card, ignore. - ctype = list(card['content'].keys())[0] - content = card['content'][ctype] - if ctype == "pollRenderer": - return {'type': "POLL", 'content': { - 'question': content['question']['simpleText'], - 'answers': [(a['text']['simpleText'],a['numVotes']) \ - for a in content['choices']], - }} - elif ctype == "videoInfoCardContentRenderer": - is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None - return {'type': "VIDEO", 'content': { - 'video_id': content['action']['watchEndpoint']['videoId'], - 'title': content['videoTitle']['simpleText'], - 'author': delL(content['channelName']['simpleText']), - 'length': content.get('lengthString',{}).get('simpleText') \ - if not is_live else "LIVE", # "23:03" - 'views': toInt(content.get('viewCountText',{}).get('simpleText')), - # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM" - }} - elif ctype == "playlistInfoCardContentRenderer": - return {'type': "PLAYLIST", 'content': { - 'playlist_id': content['action']['watchEndpoint']['playlistId'], - 'video_id': content['action']['watchEndpoint']['videoId'], - 'title': content['playlistTitle']['simpleText'], - 'author': delL(content['channelName']['simpleText']), - 'n_videos': toInt(content['playlistVideoCount']['simpleText']), - }} - elif ctype == "simpleCardContentRenderer" and \ - 'urlEndpoint' in content['command']: - return {'type': "WEBSITE", 'content': { - 'url': clean_url(content['command']['urlEndpoint']['url']), - 'domain': content['displayDomain']['simpleText'], - 'title': content['title']['simpleText'], - # XXX: no thumbnails for infocards - }} - elif ctype == "collaboratorInfoCardContentRenderer": - return {'type': "CHANNEL", 'content': { - 'channel_id': content['endpoint']['browseEndpoint']['browseId'], - 'title': content['channelName']['simpleText'], - 'icons': mkthumbs(content['channelAvatar']['thumbnails']), - 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers" - }} - else: - log_unknown_card(card) - return None - -def parse_endcard(card): - """ - parses a single endcard into a format that's easier to handle. - """ - card = card.get('endscreenElementRenderer', card) #only sometimes nested - ctype = card['style'] - if ctype == "CHANNEL": - return {'type': ctype, 'content': { - 'channel_id': card['endpoint']['browseEndpoint']['browseId'], - 'title': card['title']|G.text, - 'icons': mkthumbs(card['image']['thumbnails']), - }} - elif ctype == "VIDEO": - if not 'endpoint' in card: return None # title == "This video is unavailable." - return {'type': ctype, 'content': { - 'video_id': card['endpoint']['watchEndpoint']['videoId'], - 'title': card['title']|G.text, - 'length': card|G('videoDuration')|G.text, # '12:21' - 'views': toInt(card['metadata']|G.text), - # XXX: no channel name - }} - elif ctype == "PLAYLIST": - return {'type': ctype, 'content': { - 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], - 'video_id': card['endpoint']['watchEndpoint']['videoId'], - 'title': card['title']|G.text, - 'author': delL(card['metadata']|G.text), - 'n_videos': toInt(card['playlistLength']|G.text), - }} - elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE": - url = clean_url(card['endpoint']['urlEndpoint']['url']) - return {'type': "WEBSITE", 'content': { - 'url': url, - 'domain': urlparse(url).netloc, - 'title': card['title']|G.text, - 'icons': mkthumbs(card['image']['thumbnails']), - }} - else: - log_unknown_card(card) - return None - def parse_channel_items(items, channel_id, author): result = [] extra = [] diff --git a/app/browse/lib.py b/app/browse/lib.py index e4e36b6..c389d38 100644 --- a/app/browse/lib.py +++ b/app/browse/lib.py @@ -2,7 +2,7 @@ import re import requests from datetime import datetime, timezone -from ..common.innertube import G +from ..common.common import G def fetch_ajax(endpoint, **kwargs): """ diff --git a/app/common/common.py b/app/common/common.py index 5a3639f..1045174 100644 --- a/app/common/common.py +++ b/app/common/common.py @@ -50,6 +50,26 @@ class _NSASession(OriginalSession): return response requests.Session = requests.sessions.Session = _NSASession +class G: + """ + null-coalescing version of dict.get() that also works on lists. + + the | operator is overloaded to achieve similar looking code to jq(1) filters. + the first found key is used: dict(foo=1)|G('bar','foo') returns 1. + """ + def __init__(self, *keys): + self.keys = keys + def __ror__(self, other): + for key in self.keys: + try: return other[key] + except: continue + return None + class _Text: + """ parses youtube's .runs[].text and .simpleText variants """ + def __ror__(self, other): # Note: only returning runs[0], not concat'ing all! + return other|G('simpleText') or other|G('runs')|G(0)|G('text') + text = _Text() + def fetch_xml(feed_type, feed_id): # TODO: handle requests.exceptions.ConnectionError r = requests.get("https://www.youtube.com/feeds/videos.xml", { @@ -367,6 +387,11 @@ def video_metadata(metadata): 'shorts': is_short, } +def mkthumbs(thumbs): + output = {str(e['height']): e['url'] for e in thumbs} + largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None) + return {**output, 'largest': largest} + def store_video_metadata(video_id): # check if we know about it, and if not, fetch and store video metadata with sqlite3.connect(cf['global']['database']) as conn: @@ -467,7 +492,12 @@ def flask_logger(msg, level="warning"): except: pass -def pp(*args): - from pprint import pprint - import sys, codecs - pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer)) +def log_unknown_card(data): + import json + try: + from flask import request + source = request.url + except: source = "unknown" + with open("/tmp/innertube.err", "a", encoding="utf-8", errors="backslashreplace") as f: + f.write(f"\n/***** {source} *****/\n") + json.dump(data, f, indent=2) diff --git a/app/youtube/cards.py b/app/youtube/cards.py new file mode 100644 index 0000000..2377a8f --- /dev/null +++ b/app/youtube/cards.py @@ -0,0 +1,121 @@ +from urllib.parse import parse_qs, urlparse + +from ..common.common import mkthumbs, log_unknown_card, G # TODO: temporary, will move to somewhere else in common + +def prepare_infocards(metadata): + cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[]) + return list(filter(None, map(parse_infocard, cards))) + +def prepare_endcards(metadata): + endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[]) + return list(filter(None, map(parse_endcard, endsc))) + +def clean_url(url): + # externals URLs are redirected through youtube.com/redirect, but we + # may encounter internal URLs, too + return parse_qs(urlparse(url).query).get('q',[url])[0] + +def toInt(s, fallback=0): + if s is None: + return fallback + try: + return int(''.join(filter(str.isdigit, s))) + except ValueError: + return fallback + +# Remove left-/rightmost word from string: +delL = lambda s: s.partition(' ')[2] + +def parse_infocard(card): + """ + parses a single infocard into a format that's easier to handle. + """ + card = card['cardRenderer'] + if not 'content' in card: + return None # probably the "View corrections" card, ignore. + ctype = list(card['content'].keys())[0] + content = card['content'][ctype] + if ctype == "pollRenderer": + return {'type': "POLL", 'content': { + 'question': content['question']['simpleText'], + 'answers': [(a['text']['simpleText'],a['numVotes']) \ + for a in content['choices']], + }} + elif ctype == "videoInfoCardContentRenderer": + is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None + return {'type': "VIDEO", 'content': { + 'video_id': content['action']['watchEndpoint']['videoId'], + 'title': content['videoTitle']['simpleText'], + 'author': delL(content['channelName']['simpleText']), + 'length': content.get('lengthString',{}).get('simpleText') \ + if not is_live else "LIVE", # "23:03" + 'views': toInt(content.get('viewCountText',{}).get('simpleText')), + # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM" + }} + elif ctype == "playlistInfoCardContentRenderer": + return {'type': "PLAYLIST", 'content': { + 'playlist_id': content['action']['watchEndpoint']['playlistId'], + 'video_id': content['action']['watchEndpoint']['videoId'], + 'title': content['playlistTitle']['simpleText'], + 'author': delL(content['channelName']['simpleText']), + 'n_videos': toInt(content['playlistVideoCount']['simpleText']), + }} + elif ctype == "simpleCardContentRenderer" and \ + 'urlEndpoint' in content['command']: + return {'type': "WEBSITE", 'content': { + 'url': clean_url(content['command']['urlEndpoint']['url']), + 'domain': content['displayDomain']['simpleText'], + 'title': content['title']['simpleText'], + # XXX: no thumbnails for infocards + }} + elif ctype == "collaboratorInfoCardContentRenderer": + return {'type': "CHANNEL", 'content': { + 'channel_id': content['endpoint']['browseEndpoint']['browseId'], + 'title': content['channelName']['simpleText'], + 'icons': mkthumbs(content['channelAvatar']['thumbnails']), + 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers" + }} + else: + log_unknown_card(card) + return None + +def parse_endcard(card): + """ + parses a single endcard into a format that's easier to handle. + """ + card = card.get('endscreenElementRenderer', card) #only sometimes nested + ctype = card['style'] + if ctype == "CHANNEL": + return {'type': ctype, 'content': { + 'channel_id': card['endpoint']['browseEndpoint']['browseId'], + 'title': card['title']|G.text, + 'icons': mkthumbs(card['image']['thumbnails']), + }} + elif ctype == "VIDEO": + if not 'endpoint' in card: return None # title == "This video is unavailable." + return {'type': ctype, 'content': { + 'video_id': card['endpoint']['watchEndpoint']['videoId'], + 'title': card['title']|G.text, + 'length': card|G('videoDuration')|G.text, # '12:21' + 'views': toInt(card['metadata']|G.text), + # XXX: no channel name + }} + elif ctype == "PLAYLIST": + return {'type': ctype, 'content': { + 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], + 'video_id': card['endpoint']['watchEndpoint']['videoId'], + 'title': card['title']|G.text, + 'author': delL(card['metadata']|G.text), + 'n_videos': toInt(card['playlistLength']|G.text), + }} + elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE": + url = clean_url(card['endpoint']['urlEndpoint']['url']) + return {'type': "WEBSITE", 'content': { + 'url': url, + 'domain': urlparse(url).netloc, + 'title': card['title']|G.text, + 'icons': mkthumbs(card['image']['thumbnails']), + }} + else: + log_unknown_card(card) + return None diff --git a/app/youtube/lib.py b/app/youtube/lib.py index 9d42320..e9fe869 100644 --- a/app/youtube/lib.py +++ b/app/youtube/lib.py @@ -2,8 +2,9 @@ import re import requests from urllib.parse import urlparse +from .cards import prepare_infocards, prepare_endcards from ..common.common import video_metadata -from ..common.innertube import prepare_infocards, prepare_endcards, G +from ..common.common import G def prepare_metadata(metadata): meta = metadata['videoDetails'] -- 2.39.3