import os import re import json import html import base64 import requests import hmac, hashlib import requests_cache import dateutil.parser from xml.etree import ElementTree from configparser import ConfigParser from datetime import datetime, timezone from urllib.parse import parse_qs, urlparse cf = ConfigParser() config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini') cf.read(config_filename) if not 'global' in cf: # todo: full config check raise Exception("Configuration file not found or empty") # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,)) # Note: this should only be required for the 'memory' backed cache. # TODO: only run for long-running processes, i.e. the frontend from threading import Timer def purge_cache(sec): requests_cache.remove_expired_responses() t = Timer(sec, purge_cache, args=(sec,)) t.setDaemon(True) t.start() purge_cache(10*60) # for debugging purposes, monkey patch requests session to store each requests-request in a flask-request's g object (url and response). we can then use a flask error_handler to include the request data in the error log. # since we also call config from outside the flask appcontext, it is wrapped in a try-catch block. from flask import g import requests from requests import Session as OriginalSession class _NSASession(OriginalSession): def request(self, method, url, params=None, data=None, **kwargs): response = super(_NSASession, self).request( method, url, params, data, **kwargs ) try: if 'api_requests' not in g: g.api_requests = [] g.api_requests.append((url, params, response.text)) except RuntimeError: pass # not within flask (e.g. utils.py) return response requests.Session = requests.sessions.Session = _NSASession def fetch_xml(feed_type, feed_id): # TODO: handle requests.exceptions.ConnectionError r = requests.get("https://www.youtube.com/feeds/videos.xml", { feed_type: feed_id, }) if not r.ok: return None return r.text def parse_xml(xmldata): ns = { 'atom':"http://www.w3.org/2005/Atom", 'yt': "http://www.youtube.com/xml/schemas/2015", 'media':"http://search.yahoo.com/mrss/", 'at': "http://purl.org/atompub/tombstones/1.0", } feed = ElementTree.fromstring(xmldata) if feed.find('at:deleted-entry',ns): (_,_,vid) = feed.find('at:deleted-entry',ns).get('ref').rpartition(':') return None, None, [{'deleted': True, 'video_id': vid}] title = feed.find('atom:title',ns).text author = feed.find('atom:author/atom:name',ns).text \ if feed.find('atom:author',ns) else None videos = [] for entry in feed.findall('atom:entry',ns): videos.append({ 'video_id': entry.find('yt:videoId',ns).text, 'title': entry.find('atom:title',ns).text, 'published': entry.find('atom:published',ns).text, 'channel_id': entry.find('yt:channelId',ns).text, 'author': entry.find('atom:author',ns).find('atom:name',ns).text, # extra fields for pull_subs/webhook: 'updated': entry.find('atom:updated',ns).text, }) return title, author, videos def update_channel(db, xmldata, from_webhook=False): if not xmldata: return False # Note: websub does not return global author, hence taking from first video _, _, videos = parse_xml(xmldata) c = db.cursor() from flask import current_app # XXX: remove for i, video in enumerate(videos): if video.get('deleted'): current_app.logger.info(f"ignoring deleted video {video['video_id']}") # XXX: remove # TODO: enable once we enforce hmac validation: #c.execute("DELETE FROM videos WHERE id = ?", (video['video_id'],)) break now = datetime.now(timezone.utc) timestamp, published = None, None # The 'published' timestamp sent in websub POSTs are often wrong (e.g.: # video gets uploaded as unlisted on day A and set to public on day B; # the webhook is sent on day B, but 'published' says A. The video # therefore looks like it's just an update to an older video). If # that's the case, we fetch get_video_info and double-check. # We only need to do this to not-yet-in-the-database videos. c.execute("SELECT 1 from videos where id = ?", (video['video_id'],)) new_video = len(c.fetchall()) < 1 if from_webhook and new_video: _, meta, _, _ = get_video_info(video['video_id']) if meta: meta = prepare_metadata(meta) published = dateutil.parser.parse(meta['published']) current_app.logger.info(f"new video {video['video_id']}, uploaded {published}") # XXX: remove # if published within the last week, assume it's new if (now - published).days < 7: timestamp = now else:#, it's just an update to an older video. timestamp = published # if we update from an rss-pull, we can rely on the embedded published # dates (and don't have to fire off a whole bunch of requests) else: updated = dateutil.parser.parse(video['updated']) published = dateutil.parser.parse(video['published']) if (updated - published).seconds < 60 and (now - published).days < 7: timestamp = now else:#, it's just an update to an older video. timestamp = published c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled) VALUES (?, ?, ?, datetime(?), datetime(?)) """, ( video['video_id'], video['channel_id'], video['title'], video['published'], timestamp )) if i == 0: # only required once per feed c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (video['channel_id'], video['author'])) db.commit() return True def get_video_info(video_id, sts=0, algo=""): """ returns: best-quality muxed video stream, player_response, error-type/mesage error types: player, malformed, livestream, geolocked, exhausted """ player_error = None # for 'exhausted' for el in ['embedded', 'detailpage']:#sometimes, only one or the other works r = requests.get("https://www.youtube.com/get_video_info", { "video_id": video_id, "eurl": f"https://youtube.googleapis.com/v/{video_id}", "el": el, "sts": sts, "hl": "en_US", }) params = parse_qs(r.text) if 'errorcode' in params: # status=fail return None, None, 'malformed', params['reason'][0] metadata = json.loads(params.get('player_response')[0]) playabilityStatus = metadata['playabilityStatus']['status'] if playabilityStatus != "OK": playabilityReason = metadata['playabilityStatus'].get('reason', '//'.join(metadata['playabilityStatus'].get('messages',[]))) player_error = f"{playabilityStatus}: {playabilityReason}" if playabilityStatus == "UNPLAYABLE": continue # try again with next el value (or fail as exhausted) # without videoDetails, there's only the error message maybe_metadata = metadata if 'videoDetails' in metadata else None return None, maybe_metadata, 'player', player_error if metadata['videoDetails']['isLiveContent'] and \ (metadata['videoDetails'].get('isLive', False) or \ metadata['videoDetails'].get('isPostLiveDvr', False)): return None, metadata, 'livestream', None if not 'formats' in metadata['streamingData']: continue # no urls formats = metadata['streamingData']['formats'] for (i,v) in enumerate(formats): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) formats[i]['url'] = unscramble(cipher, algo) # todo: check if we have urls or try again url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url'] if 'gcr' in parse_qs(url): return None, metadata, 'geolocked', None return url, metadata, None, None else: return None, metadata, 'exhausted', player_error def unscramble(cipher, algo): # test video id: UxxajLWwzqY signature = list(cipher['s'][0]) for c in algo.split(): op, ix = re.match(r"([rsw])(\d+)?", c).groups() ix = int(ix) % len(signature) if ix else 0 if not op: continue if op == 'r': signature = list(reversed(signature)) if op == 's': signature = signature[ix:] if op == 'w': signature[0], signature[ix] = signature[ix], signature[0] sp = cipher.get('sp', ['signature'])[0] sig = cipher.get('sig', [''.join(signature)])[0] return f"{cipher['url'][0]}&{sp}={sig}" def prepare_metadata(metadata): meta1 = metadata['videoDetails'] meta2 = metadata['microformat']['playerMicroformatRenderer'] cards = metadata['cards']['cardCollectionRenderer']['cards'] \ if 'cards' in metadata else [] endsc = metadata['endscreen']['endscreenRenderer']['elements'] \ if 'endscreen' in metadata else [] # the actual video streams have exact information: try: sd = metadata['streamingData'] some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0] aspect_ratio = some_stream['width'] / some_stream['height'] # if that's unavailable (e.g. on livestreams), fall back to # thumbnails (only either 4:3 or 16:9). except: some_img = meta2['thumbnail']['thumbnails'][0] aspect_ratio = some_img['width'] / some_img['height'] subtitles = sorted([ {'url':cc['baseUrl'], 'code':cc['languageCode'], 'autogenerated':cc.get('kind')=="asr", 'name':cc['name']['simpleText']} for cc in metadata.get('captions',{}) .get('playerCaptionsTracklistRenderer',{}) .get('captionTracks',[]) ], key=lambda cc: cc['autogenerated']) def clean_url(url): # externals URLs are redirected through youtube.com/redirect, but we # may encounter internal URLs, too return parse_qs(urlparse(url).query).get('q',[url])[0] # Remove left-/rightmost word from string: delL = lambda s: s.partition(' ')[2] delR = lambda s: s.rpartition(' ')[0] # Thousands seperator aware int(): intT = lambda s: int(s.replace(',', '')) def parse_infocard(card): card = card['cardRenderer'] ctype = list(card['content'].keys())[0] content = card['content'][ctype] if ctype == "pollRenderer": ctype = "POLL" content = { 'question': content['question']['simpleText'], 'answers': [(a['text']['simpleText'],a['numVotes']) \ for a in content['choices']], } elif ctype == "videoInfoCardContentRenderer": ctype = "VIDEO" # if the card references a live stream, it has no length, but a "LIVE NOW" badge. # TODO: this is ugly; cleanup. is_live = content.get('badge',{}).get('liveBadgeRenderer',{}) length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03' content = { 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['videoTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'length': length, 'views': intT(delR(content['viewCountText']['simpleText'])), } elif ctype == "playlistInfoCardContentRenderer": ctype = "PLAYLIST" content = { 'playlist_id': content['action']['watchEndpoint']['playlistId'], 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['playlistTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'n_videos': intT(content['playlistVideoCount']['simpleText']), } elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']: ctype = "WEBSITE" content = { 'url': clean_url(content['command']['urlEndpoint']['url']), 'domain': content['displayDomain']['simpleText'], 'title': content['title']['simpleText'], # XXX: no thumbnails for infocards } elif ctype == "collaboratorInfoCardContentRenderer": ctype = "CHANNEL" content = { 'channel_id': content['endpoint']['browseEndpoint']['browseId'], 'title': content['channelName']['simpleText'], 'icons': mkthumbs(content['channelAvatar']['thumbnails']), 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers" } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} def mkthumbs(thumbs): return {e['height']: e['url'] for e in thumbs} def parse_endcard(card): card = card.get('endscreenElementRenderer', card) #only sometimes nested ctype = card['style'] if ctype == "CHANNEL": content = { 'channel_id': card['endpoint']['browseEndpoint']['browseId'], 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } elif ctype == "VIDEO": content = { 'video_id': card['endpoint']['watchEndpoint']['videoId'], # XXX: KeyError 'endpoint' exception (no idea which youtube video this was on) 'title': card['title']['simpleText'], 'length': card['videoDuration']['simpleText'], # '12:21' 'views': delR(card['metadata']['simpleText']), # XXX: no channel name } elif ctype == "PLAYLIST": content = { 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'author': delL(card['metadata']['simpleText']), 'n_videos': intT(delR(card['playlistLength']['simpleText'])), } elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE": ctype = "WEBSITE" url = clean_url(card['endpoint']['urlEndpoint']['url']) content = { 'url': url, 'domain': urlparse(url).netloc, 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} infocards = [parse_infocard(card) for card in cards] endcards = [parse_endcard(card) for card in endsc] # combine cards to weed out duplicates. for videos and playlists prefer # infocards, for channels and websites prefer endcards, as those have more # information than the other. # if the card type is not in ident, we use the whole card for comparison # (otherwise they'd all replace each other) ident = { # ctype -> ident 'VIDEO': 'video_id', 'PLAYLIST': 'playlist_id', 'CHANNEL': 'channel_id', 'WEBSITE': 'url', 'POLL': 'question', } getident = lambda c: c['content'].get(ident.get(c['type']), c) mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types] exclude = lambda cards, without: [c for c in cards if getident(c) not in without] allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \ exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST'])) all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split() whitelisted = sorted(meta2.get('availableCountries',[])) blacklisted = sorted(set(all_countries) - set(whitelisted)) published_at = f"{meta2['publishDate']}T00:00:00Z" # yyyy-mm-dd # 'premiere' videos (and livestreams?) have a ISO8601 date available: if 'liveBroadcastDetails' in meta2 and 'startTimestamp' in meta2['liveBroadcastDetails']: # TODO: tighten up published_at = meta2['liveBroadcastDetails']['startTimestamp'] return { 'title': meta1['title'], 'author': meta1['author'], 'channel_id': meta1['channelId'], 'description': meta1['shortDescription'], 'published': published_at, 'views': meta1['viewCount'], 'length': int(meta1['lengthSeconds']), 'rating': meta1['averageRating'], 'category': meta2['category'], 'aspectr': aspect_ratio, 'unlisted': meta2['isUnlisted'], 'whitelisted': whitelisted, 'blacklisted': blacklisted, 'poster': meta2['thumbnail']['thumbnails'][0]['url'], 'infocards': infocards, 'endcards': endcards, 'all_cards': allcards, 'subtitles': subtitles, } class RedditException(Exception): pass def fetch_reddit(subreddits, sorted_by="hot", time=None, *, limit=36, count=None, before=None, after=None): """ fetches data from a subreddit (or a multireddit like gif+gifs) and filters/sorts results. sorted_by values: hot, new, rising, controversial, top time values: hour, day, week, month, year, all (for top and controversial) """ if not subreddits: return None query = {k:v for k,v in { 'count':count, 'before':before, 'after':after, 'limit':limit, # 1..100 (default 25) 't': time, # hour,week,month,year,all }.items() if v} multireddit = '+'.join(subreddits) r = requests.get(f"https://old.reddit.com/r/{multireddit}/{sorted_by}.json", query, headers={'User-Agent':'Mozilla/5.0'}) if not r.ok or not 'data' in r.json(): raise RedditException(r.text) return r.json() def fetch_reddit_post(post_id): # Note: /api/info.json?id=t3_h7mjes == /by_id/t3_h7mjes.json r = requests.get(f"https://old.reddit.com/by_id/t3_{post_id}.json", headers={'User-Agent':'Mozilla/5.0'}) if not r.ok or not 'data' in r.json(): raise RedditException(r.text) return r.json() def parse_reddit_videos(data): videos = [] entries = sorted(data['data']['children'], key=lambda e: e['data']['score'] > 1, reverse=True) for entry in entries: e = entry['data'] if e['domain'] not in ['youtube.com', 'youtu.be', 'invidio.us']: continue try: # Note: youtube.com/ is not valid (404s), but seen in the wild. video_id = re.match(r'^https?://(?:www.|m.)?(?:youtube.com/watch\?(?:.*&)?v=|youtu.be/|youtube.com/embed/|youtube.com/)([-_0-9A-Za-z]+)', e['url']).group(1) except: continue # XXX: should we log that? if not video_id: continue videos.append({ 'video_id': video_id, 'title': html.unescape(e['title']), # Note: we unescape and re-escape in the template 'url': e['permalink'], 'n_comments': e['num_comments'], 'n_karma': e['score'], 'subreddit': e['subreddit'], 'post_id': e['id'], }) return videos class NoFallbackException(Exception): pass def fallback_route(*args, **kwargs): # TODO: worthy as a flask-extension? """ finds the next route that matches the current url rule, and executes it. args, kwargs: pass all arguments of the current route """ from flask import current_app, request, g from werkzeug.exceptions import NotFound # build a list of endpoints that match the current request's url rule: matching = [ rule.endpoint for rule in current_app.url_map.iter_rules() if rule.rule == request.url_rule.rule ] current = matching.index(request.endpoint) # since we can't change request.endpoint, we always get the original # endpoint back. so for repeated fall throughs, we use the g object to # increment how often we want to fall through. if not '_fallback_next' in g: g._fallback_next = 0 g._fallback_next += 1 next_ep = current + g._fallback_next if next_ep < len(matching): return current_app.view_functions[matching[next_ep]](*args, **kwargs) else: raise NoFallbackException def websub_url_hmac(key, feed_id, timestamp, nonce): """ generate sha1 hmac, as required by websub/pubsubhubbub """ sig_input = f"{feed_id}:{timestamp}:{nonce}".encode('ascii') return hmac.new(key.encode('ascii'), sig_input, hashlib.sha1).hexdigest() def websub_body_hmac(key, body): return hmac.new(key.encode('ascii'), body, hashlib.sha1).hexdigest() def pp(*args): from pprint import pprint import sys, codecs pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))