import os import re import json import base64 import sqlite3 import requests import hmac, hashlib import requests_cache import dateutil.parser from xml.etree import ElementTree from configparser import ConfigParser from datetime import datetime, timezone from urllib.parse import parse_qs, urlparse cf = ConfigParser() config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini') cf.read(config_filename) if not 'global' in cf: # todo: full config check raise Exception("Configuration file not found or empty") # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,)) # Note: this should only be required for the 'memory' backed cache. # TODO: only run for long-running processes, i.e. the frontend from threading import Timer def purge_cache(sec): requests_cache.remove_expired_responses() t = Timer(sec, purge_cache, args=(sec,)) t.setDaemon(True) t.start() purge_cache(10*60) # for debugging purposes, monkey patch requests session to store each requests-request in a flask-request's g object (url and response). we can then use a flask error_handler to include the request data in the error log. # since we also call config from outside the flask appcontext, it is wrapped in a try-catch block. from flask import g import requests from requests import Session as OriginalSession class _NSASession(OriginalSession): def request(self, method, url, params=None, data=None, **kwargs): response = super(_NSASession, self).request( method, url, params, data, **kwargs ) try: if 'api_requests' not in g: g.api_requests = [] g.api_requests.append((url, params, response.text)) except RuntimeError: pass # not within flask (e.g. utils.py) return response requests.Session = requests.sessions.Session = _NSASession def fetch_xml(feed_type, feed_id): # TODO: handle requests.exceptions.ConnectionError r = requests.get("https://www.youtube.com/feeds/videos.xml", { feed_type: feed_id, }) if not r.ok: return None return r.content def parse_xml(xmldata): ns = { 'atom':"http://www.w3.org/2005/Atom", 'yt': "http://www.youtube.com/xml/schemas/2015", 'media':"http://search.yahoo.com/mrss/", 'at': "http://purl.org/atompub/tombstones/1.0", } feed = ElementTree.fromstring(xmldata) if feed.find('at:deleted-entry',ns): (_,_,vid) = feed.find('at:deleted-entry',ns).get('ref').rpartition(':') return None, None, [{'deleted': True, 'video_id': vid}], None, None title = feed.find('atom:title',ns).text author = feed.find('atom:author/atom:name',ns).text \ if feed.find('atom:author',ns) else None # for /user/<> endpoint: find out UC-id: # for playlists: this is who created the playlist: try: channel_id = feed.find('yt:channelId',ns).text except:channel_id=None # XXX: why does ternary not work!? # for pullsub: if this exists, we're looking at a playlist: try: playlist_id = feed.find('yt:playlistId',ns).text except:playlist_id=None # XXX: why does ternary not work!? videos = [] for entry in feed.findall('atom:entry',ns): videos.append({ 'video_id': entry.find('yt:videoId',ns).text, 'title': entry.find('atom:title',ns).text, 'published': entry.find('atom:published',ns).text, 'channel_id': entry.find('yt:channelId',ns).text, 'author': entry.find('atom:author',ns).find('atom:name',ns).text, # extra fields for pull_subs/webhook: 'updated': entry.find('atom:updated',ns).text, }) return title, author, videos, channel_id, playlist_id def update_channel(db, xmldata, from_webhook=False): if not xmldata: return False # Note: websub does not return global author, hence taking from first video title, author, videos, channel, playlist = parse_xml(xmldata) c = db.cursor() from flask import current_app # XXX: remove for i, video in enumerate(videos): if video.get('deleted'): if from_webhook: current_app.logger.warning(f"ignoring deleted video {video['video_id']}") # XXX: remove # TODO: enable once we enforce hmac validation: #c.execute("DELETE FROM videos WHERE id = ?", (video['video_id'],)) break now = datetime.now(timezone.utc) updated = dateutil.parser.parse(video['updated']) published = dateutil.parser.parse(video['published']) # if update and published time are near-identical, we assume it's new. # checking if it was posted this week is necessary during xmlfeed pulling. if (updated - published).seconds < 60 and (now - published).days < 7: timestamp = now if from_webhook: current_app.logger.warning(f"fresh video {video['video_id']}") # XXX: remove else:#, it might just an update to an older video, or a previously unlisted one. # first, assume it's an older video (correct when pulling xmlfeeds) timestamp = published # then, check if we don't know about it and if so, look up the real date. # The 'published' timestamp sent in websub POSTs are often wrong (e.g.: # video gets uploaded as unlisted on day A and set to public on day B; # the webhook is sent on day B, but 'published' says A. The video # therefore looks like it's just an update to an older video). If # that's the case, we fetch get_video_info and double-check. # We only need to do this to not-yet-in-the-database videos. c.execute("SELECT 1 from videos where id = ?", (video['video_id'],)) new_video = len(c.fetchall()) < 1 if from_webhook: current_app.logger.warning(f"video {video['video_id']}") # XXX: remove if from_webhook and new_video: if from_webhook: current_app.logger.warning(f" is webhook and new") # XXX: remove _, _, meta, _, _ = get_video_info(video['video_id']) if meta: meta = prepare_metadata(meta) published = dateutil.parser.parse(meta['published']) if from_webhook: current_app.logger.warning(f" uploaded {published}") # XXX: remove if (now - published).days < 7: timestamp = now else:#, it's just an update to an older video. timestamp = published c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled) VALUES (?, ?, ?, datetime(?), datetime(?)) """, ( video['video_id'], video['channel_id'], video['title'], video['published'], timestamp )) # for channels, this is obviously always the same, but playlists can # consist of videos from different channels: if i == 0 or playlist: c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (video['channel_id'], video['author'])) # keep track of which videos are in a playlist, so we can show the user # why a video is in their feed: if playlist: c.execute(""" INSERT OR IGNORE INTO playlist_videos (video_id, playlist_id) VALUES (?, ?) """, (video['video_id'], playlist)) if playlist and not from_webhook: # Note: playlists can't get updated via websub c.execute(""" INSERT OR REPLACE INTO playlists (id, name, author) VALUES (?, ?, ?) """, (playlist, title, channel)) c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (channel, author)) db.commit() return True def get_video_info(video_id, sts=0, algo=""): """ returns: best-quality muxed video stream, stream map, player_response, error-type/mesage error types: player, malformed, livestream, geolocked, exhausted """ player_error = None # for 'exhausted' for el in ['embedded', 'detailpage']:#sometimes, only one or the other works r = requests.get("https://www.youtube.com/get_video_info", { "video_id": video_id, "eurl": f"https://youtube.googleapis.com/v/{video_id}", "el": el, "sts": sts, "hl": "en_US", }) params = parse_qs(r.text) if 'errorcode' in params: # status=fail return None, None, None, 'malformed', params['reason'][0] metadata = json.loads(params.get('player_response')[0]) playabilityStatus = metadata['playabilityStatus']['status'] if playabilityStatus != "OK": playabilityReason = metadata['playabilityStatus'].get('reason', '//'.join(metadata['playabilityStatus'].get('messages',[]))) player_error = f"{playabilityStatus}: {playabilityReason}" if playabilityStatus == "UNPLAYABLE": continue # try again with next el value (or fail as exhausted) # without videoDetails, there's only the error message maybe_metadata = metadata if 'videoDetails' in metadata else None return None, None, maybe_metadata, 'player', player_error if metadata['videoDetails'].get('isLive', False): return None, None, metadata, 'livestream', None if not 'formats' in metadata['streamingData']: continue # no urls formats = metadata['streamingData']['formats'] for (i,v) in enumerate(formats): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) formats[i]['url'] = unscramble(cipher, algo) adaptive = metadata['streamingData']['adaptiveFormats'] for (i,v) in enumerate(adaptive): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) adaptive[i]['url'] = unscramble(cipher, algo) stream_map = {'adaptive': adaptive, 'muxed': formats} # todo: check if we have urls or try again url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url'] # ip-locked videos can be recovered if the proxy module is loaded: is_geolocked = 'geolocked' if 'gcr' in parse_qs(urlparse(url).query) else None return url, stream_map, metadata, is_geolocked, None else: return None, None, metadata, 'exhausted', player_error def unscramble(cipher, algo): # test video id: UxxajLWwzqY signature = list(cipher['s'][0]) for c in algo.split(): op, ix = re.match(r"([rsw])(\d+)?", c).groups() ix = int(ix) % len(signature) if ix else 0 if not op: continue if op == 'r': signature = list(reversed(signature)) if op == 's': signature = signature[ix:] if op == 'w': signature[0], signature[ix] = signature[ix], signature[0] sp = cipher.get('sp', ['signature'])[0] sig = cipher.get('sig', [''.join(signature)])[0] return f"{cipher['url'][0]}&{sp}={sig}" def prepare_metadata(metadata): meta1 = metadata['videoDetails'] meta2 = metadata['microformat']['playerMicroformatRenderer'] cards = metadata['cards']['cardCollectionRenderer']['cards'] \ if 'cards' in metadata else [] endsc = metadata['endscreen']['endscreenRenderer']['elements'] \ if 'endscreen' in metadata else [] # the actual video streams have exact information: try: sd = metadata['streamingData'] some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0] aspect_ratio = some_stream['width'] / some_stream['height'] # if that's unavailable (e.g. on livestreams), fall back to # thumbnails (only either 4:3 or 16:9). except: some_img = meta2['thumbnail']['thumbnails'][0] aspect_ratio = some_img['width'] / some_img['height'] # Note: we could get subtitles in multiple formats directly by querying # https://video.google.com/timedtext?hl=en&type=list&v= followed by # https://www.youtube.com/api/timedtext?lang=&v=&fmt={srv1|srv2|srv3|ttml|vtt}, # but that won't give us autogenerated subtitles (and is an extra request). # we can still add &fmt= to the extracted URLs below (first one takes precedence). try: # find the native language captions (assuming there is only 1 audioTrack) (any level might not exist): default_track = metadata.get('captions',{}).get('playerCaptionsTracklistRenderer',{}).get('defaultAudioTrackIndex', 0) main_subtitle = metadata['captions']['playerCaptionsTracklistRenderer']['audioTracks'][default_track]['defaultCaptionTrackIndex'] except: main_subtitle = -1 subtitles = sorted([ {'url':cc['baseUrl'], 'code':cc['languageCode'], 'autogenerated':cc.get('kind')=="asr", 'name':cc['name']['simpleText'], 'default':i==main_subtitle, 'query':"fmt=vtt&"+urlparse(cc['baseUrl']).query} # for our internal proxy for i,cc in enumerate(metadata.get('captions',{}) .get('playerCaptionsTracklistRenderer',{}) .get('captionTracks',[])) # sort order: default lang gets weight 0 (first), other manually translated weight 1, autogenerated weight 2: ], key=lambda cc: (not cc['default']) + cc['autogenerated']) def clean_url(url): # externals URLs are redirected through youtube.com/redirect, but we # may encounter internal URLs, too return parse_qs(urlparse(url).query).get('q',[url])[0] # Remove left-/rightmost word from string: delL = lambda s: s.partition(' ')[2] delR = lambda s: s.rpartition(' ')[0] # Thousands seperator aware int(): intT = lambda s: int(s.replace(',', '')) def parse_infocard(card): card = card['cardRenderer'] ctype = list(card['content'].keys())[0] content = card['content'][ctype] if ctype == "pollRenderer": ctype = "POLL" content = { 'question': content['question']['simpleText'], 'answers': [(a['text']['simpleText'],a['numVotes']) \ for a in content['choices']], } elif ctype == "videoInfoCardContentRenderer": ctype = "VIDEO" # if the card references a live stream, it has no length, but a "LIVE NOW" badge. # TODO: this is ugly; cleanup. is_live = content.get('badge',{}).get('liveBadgeRenderer',{}) length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03' content = { 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['videoTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'length': length, 'views': intT(delR(content['viewCountText']['simpleText'])), } elif ctype == "playlistInfoCardContentRenderer": ctype = "PLAYLIST" content = { 'playlist_id': content['action']['watchEndpoint']['playlistId'], 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['playlistTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'n_videos': intT(content['playlistVideoCount']['simpleText']), } elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']: ctype = "WEBSITE" content = { 'url': clean_url(content['command']['urlEndpoint']['url']), 'domain': content['displayDomain']['simpleText'], 'title': content['title']['simpleText'], # XXX: no thumbnails for infocards } elif ctype == "collaboratorInfoCardContentRenderer": ctype = "CHANNEL" content = { 'channel_id': content['endpoint']['browseEndpoint']['browseId'], 'title': content['channelName']['simpleText'], 'icons': mkthumbs(content['channelAvatar']['thumbnails']), 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers" } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} def mkthumbs(thumbs): return {e['height']: e['url'] for e in thumbs} def parse_endcard(card): card = card.get('endscreenElementRenderer', card) #only sometimes nested ctype = card['style'] if ctype == "CHANNEL": content = { 'channel_id': card['endpoint']['browseEndpoint']['browseId'], 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } elif ctype == "VIDEO": content = { 'video_id': card['endpoint']['watchEndpoint']['videoId'], # XXX: KeyError 'endpoint' exception (no idea which youtube video this was on) 'title': card['title']['simpleText'], 'length': card['videoDuration']['simpleText'], # '12:21' 'views': delR(card['metadata']['simpleText']), # XXX: no channel name } elif ctype == "PLAYLIST": content = { 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'author': delL(card['metadata']['simpleText']), 'n_videos': intT(delR(card['playlistLength']['simpleText'])), } elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE": ctype = "WEBSITE" url = clean_url(card['endpoint']['urlEndpoint']['url']) content = { 'url': url, 'domain': urlparse(url).netloc, 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} infocards = [parse_infocard(card) for card in cards] endcards = [parse_endcard(card) for card in endsc] # combine cards to weed out duplicates. for videos and playlists prefer # infocards, for channels and websites prefer endcards, as those have more # information than the other. # if the card type is not in ident, we use the whole card for comparison # (otherwise they'd all replace each other) ident = { # ctype -> ident 'VIDEO': 'video_id', 'PLAYLIST': 'playlist_id', 'CHANNEL': 'channel_id', 'WEBSITE': 'url', 'POLL': 'question', } getident = lambda c: c['content'].get(ident.get(c['type']), c) mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types] exclude = lambda cards, without: [c for c in cards if getident(c) not in without] allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \ exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST'])) all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split() whitelisted = sorted(meta2.get('availableCountries',[])) blacklisted = sorted(set(all_countries) - set(whitelisted)) published_at = f"{meta2['publishDate']}T00:00:00Z" # yyyy-mm-dd # 'premiere' videos (and livestreams?) have a ISO8601 date available: if 'liveBroadcastDetails' in meta2 and 'startTimestamp' in meta2['liveBroadcastDetails']: # TODO: tighten up published_at = meta2['liveBroadcastDetails']['startTimestamp'] return { 'title': meta1['title'], 'author': meta1['author'], 'channel_id': meta1['channelId'], 'description': meta1['shortDescription'], 'published': published_at, 'views': meta1['viewCount'], 'length': int(meta1['lengthSeconds']), 'rating': meta1['averageRating'], 'category': meta2['category'], 'aspectr': aspect_ratio, 'unlisted': meta2['isUnlisted'], 'whitelisted': whitelisted, 'blacklisted': blacklisted, 'poster': meta2['thumbnail']['thumbnails'][0]['url'], 'infocards': infocards, 'endcards': endcards, 'all_cards': allcards, 'subtitles': subtitles, } def store_video_metadata(video_id): # check if we know about it, and if not, fetch and store video metadata with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT 1 from videos where id = ?", (video_id,)) new_video = len(c.fetchall()) < 1 if new_video: _, _, meta, _, _ = get_video_info(video_id) if meta: meta = prepare_metadata(meta) c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled) VALUES (?, ?, ?, datetime(?), datetime(?)) """, ( video_id, meta['channel_id'], meta['title'], meta['published'], meta['published'], )) c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (meta['channel_id'], meta['author'])) from werkzeug.exceptions import NotFound class NoFallbackException(NotFound): pass def fallback_route(*args, **kwargs): # TODO: worthy as a flask-extension? """ finds the next route that matches the current url rule, and executes it. args, kwargs: pass all arguments of the current route """ from flask import current_app, request, g # build a list of endpoints that match the current request's url rule: matching = [ rule.endpoint for rule in current_app.url_map.iter_rules() if rule.rule == request.url_rule.rule ] current = matching.index(request.endpoint) # since we can't change request.endpoint, we always get the original # endpoint back. so for repeated fall throughs, we use the g object to # increment how often we want to fall through. if not '_fallback_next' in g: g._fallback_next = 0 g._fallback_next += 1 next_ep = current + g._fallback_next if next_ep < len(matching): return current_app.view_functions[matching[next_ep]](*args, **kwargs) else: raise NoFallbackException def websub_url_hmac(key, feed_id, timestamp, nonce): """ generate sha1 hmac, as required by websub/pubsubhubbub """ sig_input = f"{feed_id}:{timestamp}:{nonce}".encode('ascii') return hmac.new(key.encode('ascii'), sig_input, hashlib.sha1).hexdigest() def websub_body_hmac(key, body): return hmac.new(key.encode('ascii'), body, hashlib.sha1).hexdigest() def pp(*args): from pprint import pprint import sys, codecs pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))