import os import re import json import base64 import sqlite3 import requests import hmac, hashlib import requests_cache import dateutil.parser from xml.etree import ElementTree from configparser import ConfigParser from datetime import datetime, timezone from urllib.parse import parse_qs, urlparse cf = ConfigParser() config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini') cf.read(config_filename) if not 'global' in cf: # todo: full config check raise Exception("Configuration file not found or empty") # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,)) # Note: requests-cache doesn't use redis expiry, so we need this in all backends: # https://github.com/reclosedev/requests-cache/issues/58#issuecomment-164537971 # TODO: only run for long-running processes, i.e. the frontend from threading import Timer def purge_cache(sec): requests_cache.remove_expired_responses() t = Timer(sec, purge_cache, args=(sec,)) t.setDaemon(True) t.start() purge_cache(10*60) # for debugging purposes, monkey patch requests session to store each requests-request in a flask-request's g object (url and response). we can then use a flask error_handler to include the request data in the error log. # since we also call config from outside the flask appcontext, it is wrapped in a try-catch block. from flask import g import requests from requests import Session as OriginalSession class _NSASession(OriginalSession): def request(self, method, url, params=None, data=None, **kwargs): response = super(_NSASession, self).request( method, url, params, data, **kwargs ) try: if 'api_requests' not in g: g.api_requests = [] g.api_requests.append((url, params, response.text)) except RuntimeError: pass # not within flask (e.g. utils.py) return response requests.Session = requests.sessions.Session = _NSASession def fetch_xml(feed_type, feed_id): # TODO: handle requests.exceptions.ConnectionError r = requests.get("https://www.youtube.com/feeds/videos.xml", { feed_type: feed_id, }) if not r.ok: return None return r.content def parse_xml(xmldata): ns = { 'atom':"http://www.w3.org/2005/Atom", 'yt': "http://www.youtube.com/xml/schemas/2015", 'media':"http://search.yahoo.com/mrss/", 'at': "http://purl.org/atompub/tombstones/1.0", } feed = ElementTree.fromstring(xmldata) if feed.find('at:deleted-entry',ns): (_,_,vid) = feed.find('at:deleted-entry',ns).get('ref').rpartition(':') return None, None, [{'deleted': True, 'video_id': vid}], None, None title = feed.find('atom:title',ns).text author = feed.find('atom:author/atom:name',ns).text \ if feed.find('atom:author',ns) else None # for /user/<> endpoint: find out UC-id: # for playlists: this is who created the playlist: try: channel_id = feed.find('yt:channelId',ns).text except:channel_id=None # XXX: why does ternary not work!? # for pullsub: if this exists, we're looking at a playlist: try: playlist_id = feed.find('yt:playlistId',ns).text except:playlist_id=None # XXX: why does ternary not work!? videos = [] for entry in feed.findall('atom:entry',ns): videos.append({ 'video_id': entry.find('yt:videoId',ns).text, 'title': entry.find('atom:title',ns).text, 'published': entry.find('atom:published',ns).text, 'channel_id': entry.find('yt:channelId',ns).text, 'author': entry.find('atom:author',ns).find('atom:name',ns).text, # extra fields for pull_subs/webhook: 'updated': entry.find('atom:updated',ns).text, }) return title, author, videos, channel_id, playlist_id def update_channel(db, xmldata, from_webhook=False): if not xmldata: return False # Note: websub does not return global author, hence taking from first video title, author, videos, channel, playlist = parse_xml(xmldata) c = db.cursor() for i, video in enumerate(videos): if video.get('deleted'): # Note: Deletion events are not just fired for actual deletions, # but also for unlisting videos and livestreams that just ended # (even postLiveDVR ones). Hence, we don't follow it. flask_logger(f"ignoring deleted/unlisted/ended video/stream {video['video_id']}") break c.execute("SELECT 1 FROM videos WHERE id=?",(video['video_id'],)) new_video = len(c.fetchall()) < 1 if new_video: flask_logger(f"new video {video['video_id']}") _, _, meta, _, _ = get_video_info(video['video_id']) # The 'published' timestamp sent in websub POSTs are often wrong (e.g.: # video gets uploaded as unlisted on day A and set to public on day B; # the webhook is sent on day B, but 'published' says A. The video # therefore looks like it's just an update to an older video). # g_v_i gives is the date the video was published to viewers, so we # prefer that. But since g_v_i only returns the date without time, # we still use xmlfeed's date if it's the same date. published = dateutil.parser.parse(video['published']) length = None livestream = None if meta: meta = video_metadata(meta) published2 = dateutil.parser.parse(meta['published']) flask_logger(f"published {published} / {published2}") if published < published2: # g_v_i date is more accurate: published = published2 length = meta['length'] livestream = meta['livestream'] now = datetime.now(timezone.utc) # we pretend that all videos uploaded this week were uploaded just # now, so the user sees it at the top of the feed, and it doesn't # get inserted somewhere further down. if (now - published).days < 7: timestamp = now else:#, it's just an update to an older video. timestamp = published c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, length, livestream, published, crawled) VALUES (?, ?, ?, ?, ?, datetime(?), datetime(?)) """, ( video['video_id'], video['channel_id'], video['title'], length, livestream, video['published'], timestamp )) else: # update video title (everything else can't change) c.execute(""" UPDATE OR IGNORE videos SET title = ? WHERE id = ? """, ( video['title'], video['video_id'], )) # for channels, this is obviously always the same, but playlists can # consist of videos from different channels: if i == 0 or playlist: c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (video['channel_id'], video['author'])) # keep track of which videos are in a playlist, so we can show the user # why a video is in their feed: if playlist: c.execute(""" INSERT OR IGNORE INTO playlist_videos (video_id, playlist_id) VALUES (?, ?) """, (video['video_id'], playlist)) if playlist and not from_webhook: # Note: playlists can't get updated via websub c.execute(""" INSERT OR REPLACE INTO playlists (id, name, author) VALUES (?, ?, ?) """, (playlist, title, channel)) c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (channel, author)) db.commit() return True def get_video_info(video_id, sts=0, algo=""): """ returns: best-quality muxed video stream, stream map, player_response, error-type/mesage error types: player, malformed, livestream, geolocked, exhausted """ player_error, metadata = None, None # for 'exhausted' with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT * FROM captcha_cookies") cookies = dict(c.fetchall()) for el in ['WEB_EMBEDDED_PLAYER', 'WEB']: # sometimes, only one or the other works today = datetime.now(timezone.utc).strftime("%Y%m%d") # XXX: anticaptcha hasn't been adapted # XXX: this is not cached any more! r = requests.post("https://www.youtube-nocookie.com/youtubei/v1/player?key=AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8", json={ 'videoId': video_id, 'context': { 'client': { 'gl': 'US', 'hl': 'en', 'clientName': el, 'clientVersion': f'2.{today}.01.01', } }, 'playbackContext': {'contentPlaybackContext': {'signatureTimestamp': sts}} }, cookies=cookies) if r.status_code == 429: return None, None, None, 'banned', 'possible IP ban' metadata = r.json() playabilityStatus = metadata['playabilityStatus']['status'] if playabilityStatus != "OK": playabilityReason = metadata['playabilityStatus'].get('reason', '//'.join(metadata['playabilityStatus'].get('messages',[]))) player_error = f"{playabilityStatus}: {playabilityReason}" if playabilityStatus == "UNPLAYABLE": continue # try again with next el value (or fail as exhausted) # without videoDetails, there's only the error message maybe_metadata = metadata if 'videoDetails' in metadata else None return None, None, maybe_metadata, 'player', player_error # livestreams have no adaptive/muxed formats: is_live = metadata['videoDetails'].get('isLive', False) if not 'formats' in metadata['streamingData'] and not is_live: continue # no urls formats = metadata['streamingData'].get('formats',[]) for (i,v) in enumerate(formats): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) formats[i]['url'] = unscramble(cipher, algo) adaptive = metadata['streamingData'].get('adaptiveFormats',[]) for (i,v) in enumerate(adaptive): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) adaptive[i]['url'] = unscramble(cipher, algo) stream_map = { 'adaptive': adaptive, 'muxed': formats, 'hlsManifestUrl': metadata['streamingData'].get('hlsManifestUrl'), } url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url'] \ if not is_live else None # ip-locked videos can be recovered if the proxy module is loaded: is_geolocked = 'gcr' in parse_qs(urlparse(url).query) nonfatal = 'livestream' if is_live \ else 'geolocked' if is_geolocked \ else None return url, stream_map, metadata, nonfatal, None else: return None, None, metadata, 'exhausted', player_error def unscramble(cipher, algo): signature = list(cipher['s'][0]) for c in algo.split(): op, ix = re.match(r"([rsw])(\d+)?", c).groups() ix = int(ix) % len(signature) if ix else 0 if op == 'r': signature = list(reversed(signature)) if op == 's': signature = signature[ix:] if op == 'w': signature[0], signature[ix] = signature[ix], signature[0] sp = cipher.get('sp', ['signature'])[0] sig = cipher.get('sig', [''.join(signature)])[0] return f"{cipher['url'][0]}&{sp}={sig}" def video_metadata(metadata): if not metadata: return {} meta1 = metadata['videoDetails'] meta2 = metadata['microformat']['playerMicroformatRenderer'] # sometimes, we receive the notification so early that the length is not # yet populated. Nothing we can do about it. length = int(meta2['lengthSeconds']) or int(meta1['lengthSeconds']) or None published_at = meta2.get('liveBroadcastDetails',{}) \ .get('startTimestamp', f"{meta2['publishDate']}T00:00:00Z") # Note: 'premiere' videos have livestream=False and published= will be the # start of the premiere. return { 'title': meta1['title'], 'author': meta1['author'], 'channel_id': meta1['channelId'], 'published': published_at, 'views': int(meta1['viewCount']), 'length': length, 'livestream': meta1['isLiveContent'], } def store_video_metadata(video_id): # check if we know about it, and if not, fetch and store video metadata with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT 1 from videos where id = ?", (video_id,)) new_video = len(c.fetchall()) < 1 if new_video: _, _, meta, _, _ = get_video_info(video_id) if meta: meta = video_metadata(meta) c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, length, published, crawled) VALUES (?, ?, ?, ?, datetime(?), datetime(?)) """, ( video_id, meta['channel_id'], meta['title'], meta['length'], meta['published'], meta['published'], )) c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (meta['channel_id'], meta['author'])) def fetch_video_flags(token, video_ids): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT video_id,display FROM flags WHERE user = ? AND display IS NOT NULL AND video_id IN ({}) -- AND display = 'pinned' """.format(",".join(["?"]*len(video_ids))), (token,*video_ids)) flags = c.fetchall() pinned = [video for video,disp in flags if disp == 'pinned'] hidden = [video for video,disp in flags if disp == 'hidden'] return pinned, hidden from werkzeug.exceptions import NotFound class NoFallbackException(NotFound): pass def fallback_route(*args, **kwargs): # TODO: worthy as a flask-extension? """ finds the next route that matches the current url rule, and executes it. args, kwargs: pass all arguments of the current route """ from flask import current_app, request, g # build a list of endpoints that match the current request's url rule: matching = [ rule.endpoint for rule in current_app.url_map.iter_rules() if rule.rule == request.url_rule.rule ] current = matching.index(request.endpoint) # since we can't change request.endpoint, we always get the original # endpoint back. so for repeated fall throughs, we use the g object to # increment how often we want to fall through. if not '_fallback_next' in g: g._fallback_next = 0 g._fallback_next += 1 next_ep = current + g._fallback_next if next_ep < len(matching): return current_app.view_functions[matching[next_ep]](*args, **kwargs) else: raise NoFallbackException def websub_url_hmac(key, feed_id, timestamp, nonce): """ generate sha1 hmac, as required by websub/pubsubhubbub """ sig_input = f"{feed_id}:{timestamp}:{nonce}".encode('ascii') return hmac.new(key.encode('ascii'), sig_input, hashlib.sha1).hexdigest() def websub_body_hmac(key, body): return hmac.new(key.encode('ascii'), body, hashlib.sha1).hexdigest() def flask_logger(msg, level="warning"): level = dict( CRITICAL=50, ERROR=40, WARNING=30, INFO=20, DEBUG=10, NOTSET=0, ).get(level.upper(), 0) try: from flask import current_app current_app.logger.log(level, msg) except: pass def pp(*args): from pprint import pprint import sys, codecs pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))