import os import re import json import base64 import sqlite3 import requests import hmac, hashlib import requests_cache import dateutil.parser from xml.etree import ElementTree from configparser import ConfigParser from datetime import datetime, timezone from urllib.parse import parse_qs, urlparse cf = ConfigParser() config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini') cf.read(config_filename) if not 'global' in cf: # todo: full config check raise Exception("Configuration file not found or empty") # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,)) # Note: this should only be required for the 'memory' backed cache. # TODO: only run for long-running processes, i.e. the frontend from threading import Timer def purge_cache(sec): requests_cache.remove_expired_responses() t = Timer(sec, purge_cache, args=(sec,)) t.setDaemon(True) t.start() purge_cache(10*60) # for debugging purposes, monkey patch requests session to store each requests-request in a flask-request's g object (url and response). we can then use a flask error_handler to include the request data in the error log. # since we also call config from outside the flask appcontext, it is wrapped in a try-catch block. from flask import g import requests from requests import Session as OriginalSession class _NSASession(OriginalSession): def request(self, method, url, params=None, data=None, **kwargs): response = super(_NSASession, self).request( method, url, params, data, **kwargs ) try: if 'api_requests' not in g: g.api_requests = [] g.api_requests.append((url, params, response.text)) except RuntimeError: pass # not within flask (e.g. utils.py) return response requests.Session = requests.sessions.Session = _NSASession def fetch_xml(feed_type, feed_id): # TODO: handle requests.exceptions.ConnectionError r = requests.get("https://www.youtube.com/feeds/videos.xml", { feed_type: feed_id, }) if not r.ok: return None return r.content def parse_xml(xmldata): ns = { 'atom':"http://www.w3.org/2005/Atom", 'yt': "http://www.youtube.com/xml/schemas/2015", 'media':"http://search.yahoo.com/mrss/", 'at': "http://purl.org/atompub/tombstones/1.0", } feed = ElementTree.fromstring(xmldata) if feed.find('at:deleted-entry',ns): (_,_,vid) = feed.find('at:deleted-entry',ns).get('ref').rpartition(':') return None, None, [{'deleted': True, 'video_id': vid}] title = feed.find('atom:title',ns).text author = feed.find('atom:author/atom:name',ns).text \ if feed.find('atom:author',ns) else None videos = [] for entry in feed.findall('atom:entry',ns): videos.append({ 'video_id': entry.find('yt:videoId',ns).text, 'title': entry.find('atom:title',ns).text, 'published': entry.find('atom:published',ns).text, 'channel_id': entry.find('yt:channelId',ns).text, 'author': entry.find('atom:author',ns).find('atom:name',ns).text, # extra fields for pull_subs/webhook: 'updated': entry.find('atom:updated',ns).text, }) return title, author, videos def update_channel(db, xmldata, from_webhook=False): if not xmldata: return False # Note: websub does not return global author, hence taking from first video _, _, videos = parse_xml(xmldata) c = db.cursor() from flask import current_app # XXX: remove for i, video in enumerate(videos): if video.get('deleted'): if from_webhook: current_app.logger.warning(f"ignoring deleted video {video['video_id']}") # XXX: remove # TODO: enable once we enforce hmac validation: #c.execute("DELETE FROM videos WHERE id = ?", (video['video_id'],)) break now = datetime.now(timezone.utc) updated = dateutil.parser.parse(video['updated']) published = dateutil.parser.parse(video['published']) # if update and published time are near-identical, we assume it's new. # checking if it was posted this week is necessary during xmlfeed pulling. if (updated - published).seconds < 60 and (now - published).days < 7: timestamp = now if from_webhook: current_app.logger.warning(f"fresh video {video['video_id']}") # XXX: remove else:#, it might just an update to an older video, or a previously unlisted one. # first, assume it's an older video (correct when pulling xmlfeeds) timestamp = published # then, check if we don't know about it and if so, look up the real date. # The 'published' timestamp sent in websub POSTs are often wrong (e.g.: # video gets uploaded as unlisted on day A and set to public on day B; # the webhook is sent on day B, but 'published' says A. The video # therefore looks like it's just an update to an older video). If # that's the case, we fetch get_video_info and double-check. # We only need to do this to not-yet-in-the-database videos. c.execute("SELECT 1 from videos where id = ?", (video['video_id'],)) new_video = len(c.fetchall()) < 1 if from_webhook: current_app.logger.warning(f"video {video['video_id']}") # XXX: remove if from_webhook and new_video: if from_webhook: current_app.logger.warning(f" is webhook and new") # XXX: remove _, meta, _, _ = get_video_info(video['video_id']) if meta: meta = prepare_metadata(meta) published = dateutil.parser.parse(meta['published']) if from_webhook: current_app.logger.warning(f" uploaded {published}") # XXX: remove if (now - published).days < 7: timestamp = now else:#, it's just an update to an older video. timestamp = published c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled) VALUES (?, ?, ?, datetime(?), datetime(?)) """, ( video['video_id'], video['channel_id'], video['title'], video['published'], timestamp )) if i == 0: # only required once per feed c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (video['channel_id'], video['author'])) db.commit() return True def get_video_info(video_id, sts=0, algo=""): """ returns: best-quality muxed video stream, player_response, error-type/mesage error types: player, malformed, livestream, geolocked, exhausted """ player_error = None # for 'exhausted' for el in ['embedded', 'detailpage']:#sometimes, only one or the other works r = requests.get("https://www.youtube.com/get_video_info", { "video_id": video_id, "eurl": f"https://youtube.googleapis.com/v/{video_id}", "el": el, "sts": sts, "hl": "en_US", }) params = parse_qs(r.text) if 'errorcode' in params: # status=fail return None, None, 'malformed', params['reason'][0] metadata = json.loads(params.get('player_response')[0]) playabilityStatus = metadata['playabilityStatus']['status'] if playabilityStatus != "OK": playabilityReason = metadata['playabilityStatus'].get('reason', '//'.join(metadata['playabilityStatus'].get('messages',[]))) player_error = f"{playabilityStatus}: {playabilityReason}" if playabilityStatus == "UNPLAYABLE": continue # try again with next el value (or fail as exhausted) # without videoDetails, there's only the error message maybe_metadata = metadata if 'videoDetails' in metadata else None return None, maybe_metadata, 'player', player_error if metadata['videoDetails'].get('isLive', False): return None, metadata, 'livestream', None if not 'formats' in metadata['streamingData']: continue # no urls formats = metadata['streamingData']['formats'] for (i,v) in enumerate(formats): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) formats[i]['url'] = unscramble(cipher, algo) # todo: check if we have urls or try again url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url'] # ip-locked videos can be recovered if the proxy module is loaded: is_geolocked = 'geolocked' if 'gcr' in parse_qs(urlparse(url).query) else None return url, metadata, is_geolocked, None else: return None, metadata, 'exhausted', player_error def unscramble(cipher, algo): # test video id: UxxajLWwzqY signature = list(cipher['s'][0]) for c in algo.split(): op, ix = re.match(r"([rsw])(\d+)?", c).groups() ix = int(ix) % len(signature) if ix else 0 if not op: continue if op == 'r': signature = list(reversed(signature)) if op == 's': signature = signature[ix:] if op == 'w': signature[0], signature[ix] = signature[ix], signature[0] sp = cipher.get('sp', ['signature'])[0] sig = cipher.get('sig', [''.join(signature)])[0] return f"{cipher['url'][0]}&{sp}={sig}" def prepare_metadata(metadata): meta1 = metadata['videoDetails'] meta2 = metadata['microformat']['playerMicroformatRenderer'] cards = metadata['cards']['cardCollectionRenderer']['cards'] \ if 'cards' in metadata else [] endsc = metadata['endscreen']['endscreenRenderer']['elements'] \ if 'endscreen' in metadata else [] # the actual video streams have exact information: try: sd = metadata['streamingData'] some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0] aspect_ratio = some_stream['width'] / some_stream['height'] # if that's unavailable (e.g. on livestreams), fall back to # thumbnails (only either 4:3 or 16:9). except: some_img = meta2['thumbnail']['thumbnails'][0] aspect_ratio = some_img['width'] / some_img['height'] # Note: we could get subtitles in multiple formats directly by querying # https://video.google.com/timedtext?hl=en&type=list&v= followed by # https://www.youtube.com/api/timedtext?lang=&v=&fmt={srv1|srv2|srv3|ttml|vtt}, # but that won't give us autogenerated subtitles (and is an extra request). # we can still add &fmt= to the extracted URLs below (first one takes precedence). try: # find the native language captions (assuming there is only 1 audioTrack) (any level might not exist): default_track = metadata.get('captions',{}).get('playerCaptionsTracklistRenderer',{}).get('defaultAudioTrackIndex', 0) main_subtitle = metadata['captions']['playerCaptionsTracklistRenderer']['audioTracks'][default_track]['defaultCaptionTrackIndex'] except: main_subtitle = -1 subtitles = sorted([ {'url':cc['baseUrl'], 'code':cc['languageCode'], 'autogenerated':cc.get('kind')=="asr", 'name':cc['name']['simpleText'], 'default':i==main_subtitle, 'query':"fmt=vtt&"+urlparse(cc['baseUrl']).query} # for our internal proxy for i,cc in enumerate(metadata.get('captions',{}) .get('playerCaptionsTracklistRenderer',{}) .get('captionTracks',[])) # sort order: default lang gets weight 0 (first), other manually translated weight 1, autogenerated weight 2: ], key=lambda cc: (not cc['default']) + cc['autogenerated']) def clean_url(url): # externals URLs are redirected through youtube.com/redirect, but we # may encounter internal URLs, too return parse_qs(urlparse(url).query).get('q',[url])[0] # Remove left-/rightmost word from string: delL = lambda s: s.partition(' ')[2] delR = lambda s: s.rpartition(' ')[0] # Thousands seperator aware int(): intT = lambda s: int(s.replace(',', '')) def parse_infocard(card): card = card['cardRenderer'] ctype = list(card['content'].keys())[0] content = card['content'][ctype] if ctype == "pollRenderer": ctype = "POLL" content = { 'question': content['question']['simpleText'], 'answers': [(a['text']['simpleText'],a['numVotes']) \ for a in content['choices']], } elif ctype == "videoInfoCardContentRenderer": ctype = "VIDEO" # if the card references a live stream, it has no length, but a "LIVE NOW" badge. # TODO: this is ugly; cleanup. is_live = content.get('badge',{}).get('liveBadgeRenderer',{}) length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03' content = { 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['videoTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'length': length, 'views': intT(delR(content['viewCountText']['simpleText'])), } elif ctype == "playlistInfoCardContentRenderer": ctype = "PLAYLIST" content = { 'playlist_id': content['action']['watchEndpoint']['playlistId'], 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['playlistTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'n_videos': intT(content['playlistVideoCount']['simpleText']), } elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']: ctype = "WEBSITE" content = { 'url': clean_url(content['command']['urlEndpoint']['url']), 'domain': content['displayDomain']['simpleText'], 'title': content['title']['simpleText'], # XXX: no thumbnails for infocards } elif ctype == "collaboratorInfoCardContentRenderer": ctype = "CHANNEL" content = { 'channel_id': content['endpoint']['browseEndpoint']['browseId'], 'title': content['channelName']['simpleText'], 'icons': mkthumbs(content['channelAvatar']['thumbnails']), 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers" } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} def mkthumbs(thumbs): return {e['height']: e['url'] for e in thumbs} def parse_endcard(card): card = card.get('endscreenElementRenderer', card) #only sometimes nested ctype = card['style'] if ctype == "CHANNEL": content = { 'channel_id': card['endpoint']['browseEndpoint']['browseId'], 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } elif ctype == "VIDEO": content = { 'video_id': card['endpoint']['watchEndpoint']['videoId'], # XXX: KeyError 'endpoint' exception (no idea which youtube video this was on) 'title': card['title']['simpleText'], 'length': card['videoDuration']['simpleText'], # '12:21' 'views': delR(card['metadata']['simpleText']), # XXX: no channel name } elif ctype == "PLAYLIST": content = { 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'author': delL(card['metadata']['simpleText']), 'n_videos': intT(delR(card['playlistLength']['simpleText'])), } elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE": ctype = "WEBSITE" url = clean_url(card['endpoint']['urlEndpoint']['url']) content = { 'url': url, 'domain': urlparse(url).netloc, 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} infocards = [parse_infocard(card) for card in cards] endcards = [parse_endcard(card) for card in endsc] # combine cards to weed out duplicates. for videos and playlists prefer # infocards, for channels and websites prefer endcards, as those have more # information than the other. # if the card type is not in ident, we use the whole card for comparison # (otherwise they'd all replace each other) ident = { # ctype -> ident 'VIDEO': 'video_id', 'PLAYLIST': 'playlist_id', 'CHANNEL': 'channel_id', 'WEBSITE': 'url', 'POLL': 'question', } getident = lambda c: c['content'].get(ident.get(c['type']), c) mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types] exclude = lambda cards, without: [c for c in cards if getident(c) not in without] allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \ exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST'])) all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split() whitelisted = sorted(meta2.get('availableCountries',[])) blacklisted = sorted(set(all_countries) - set(whitelisted)) published_at = f"{meta2['publishDate']}T00:00:00Z" # yyyy-mm-dd # 'premiere' videos (and livestreams?) have a ISO8601 date available: if 'liveBroadcastDetails' in meta2 and 'startTimestamp' in meta2['liveBroadcastDetails']: # TODO: tighten up published_at = meta2['liveBroadcastDetails']['startTimestamp'] return { 'title': meta1['title'], 'author': meta1['author'], 'channel_id': meta1['channelId'], 'description': meta1['shortDescription'], 'published': published_at, 'views': meta1['viewCount'], 'length': int(meta1['lengthSeconds']), 'rating': meta1['averageRating'], 'category': meta2['category'], 'aspectr': aspect_ratio, 'unlisted': meta2['isUnlisted'], 'whitelisted': whitelisted, 'blacklisted': blacklisted, 'poster': meta2['thumbnail']['thumbnails'][0]['url'], 'infocards': infocards, 'endcards': endcards, 'all_cards': allcards, 'subtitles': subtitles, } def store_video_metadata(video_id): # check if we know about it, and if not, fetch and store video metadata with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT 1 from videos where id = ?", (video_id,)) new_video = len(c.fetchall()) < 1 if new_video: _, meta, _, _ = get_video_info(video_id) if meta: meta = prepare_metadata(meta) c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled) VALUES (?, ?, ?, datetime(?), datetime(?)) """, ( video_id, meta['channel_id'], meta['title'], meta['published'], meta['published'], )) c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (meta['channel_id'], meta['author'])) from werkzeug.exceptions import NotFound class NoFallbackException(NotFound): pass def fallback_route(*args, **kwargs): # TODO: worthy as a flask-extension? """ finds the next route that matches the current url rule, and executes it. args, kwargs: pass all arguments of the current route """ from flask import current_app, request, g # build a list of endpoints that match the current request's url rule: matching = [ rule.endpoint for rule in current_app.url_map.iter_rules() if rule.rule == request.url_rule.rule ] current = matching.index(request.endpoint) # since we can't change request.endpoint, we always get the original # endpoint back. so for repeated fall throughs, we use the g object to # increment how often we want to fall through. if not '_fallback_next' in g: g._fallback_next = 0 g._fallback_next += 1 next_ep = current + g._fallback_next if next_ep < len(matching): return current_app.view_functions[matching[next_ep]](*args, **kwargs) else: raise NoFallbackException def websub_url_hmac(key, feed_id, timestamp, nonce): """ generate sha1 hmac, as required by websub/pubsubhubbub """ sig_input = f"{feed_id}:{timestamp}:{nonce}".encode('ascii') return hmac.new(key.encode('ascii'), sig_input, hashlib.sha1).hexdigest() def websub_body_hmac(key, body): return hmac.new(key.encode('ascii'), body, hashlib.sha1).hexdigest() def pp(*args): from pprint import pprint import sys, codecs pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))