#!/bin/sh ''':' . "`dirname "$0"`/../../venv/bin/activate" exec python "$0" "$@" ':''' import os import sys import time import secrets import sqlite3 import requests import subprocess import html.parser from common import * feed_param = { 'channel': 'channel_id', 'playlist': 'playlist_id', } def pull_subscriptions(verbose=1, force_all=False, limit=-1): """ Crawls youtube channels' RSS feeds and stores found videos in the database. verbose: 0: completely silent; 1: warn on errors; 2: log all accessed feeds force_all: fetch all known channels. otherwise only those not crawled in 24h limit: limit number of processed feeds """ with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT DISTINCT s.channel_id, type FROM subscriptions AS s LEFT JOIN crawler AS c ON s.channel_id = c.channel_id WHERE ? OR IFNULL(crawled_at,0) < datetime('now', '-1 day') ORDER BY crawled_at LIMIT ? """, (force_all,limit)) results = c.fetchall() if verbose >= 2 and not len(results): sys.stderr.write(f'no feeds to update.\n') for i,(feed_id, feed_type) in enumerate(results): if i: time.sleep(60) pull_feed(feed_id, feed_type, conn, verbose) def pull_feed(feed_id, feed_type, conn, verbose): c = conn.cursor() if verbose >= 2: sys.stderr.write(f'fetching {feed_id}\n') xmlfeed = fetch_xml(feed_param[feed_type], feed_id) if not xmlfeed: if verbose: sys.stderr.write(f'FETCH FAILED: {feed_id}\n') return False try: update_channel(conn, xmlfeed) except: if verbose: sys.stderr.write(f'STORE FAILED: {feed_id}\n') # writing failed, so we store the feed in a file for later analysis. with open('/tmp/pull-subscriptions.err', 'ab') as f: f.write(f"\n" .encode('ascii')) f.write(xmlfeed + b"\n") return False # update crawled_at timestamp: c.execute(""" INSERT OR REPLACE INTO crawler (channel_id) VALUES (?) """, (feed_id,)) conn.commit() return True def update_subscriptions(verbose=1, force_all=False, limit=-1): """ Refreshes the websub (pubsubhubhub) subscription requests for youtube feeds. verbose: 0: completely silent; 1: warn on errors; 2: log all accessed feeds limit: limit number of processed feeds """ with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT DISTINCT s.channel_id, type FROM subscriptions AS s LEFT JOIN websub AS w ON s.channel_id = w.channel_id WHERE ? OR IFNULL(subscribed_until,0) < datetime('now','+12 hours') AND type = 'channel' -- playlists don't support websub ORDER BY subscribed_until LIMIT ? """, (force_all,limit)) results = c.fetchall() if verbose >= 2 and not len(results): sys.stderr.write(f'no feeds to update.\n') for i,(feed_id, feed_type) in enumerate(results): if i: time.sleep(60) update_feed(feed_id, feed_type, verbose) def update_feed(feed_id, feed_type, verbose): webhook = cf['websub']['public_uri'] lease = cf['websub']['lease'] hmackey = cf['websub']['hmac_key'] if verbose >= 2: sys.stderr.write(f'updating {feed_id}\n') version, timestamp = "v1", int(time.time()) nonce = secrets.token_urlsafe(16) sig = websub_url_hmac(hmackey, feed_id, timestamp, nonce) r = requests.post("https://pubsubhubbub.appspot.com/subscribe", { "hub.callback": f"{webhook}/websub/{version}/{timestamp}/" + \ f"{nonce}/{feed_id}/{sig}", "hub.topic": f"https://www.youtube.com/xml/feeds/videos.xml" + \ f"?{feed_param[feed_type]}={feed_id}", "hub.verify": "async", "hub.mode": "subscribe", "hub.lease_seconds": lease, "hub.secret": hmackey, }) if not r.ok: if verbose: sys.stderr.write(f'FAILED {feed_id}: {r.text}\n') return False return True def refresh_cipher(verbose=1, force=False): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute("SELECT url FROM cipher") (player_url,) = c.fetchone() new_url = find_player_url(verbose) if not new_url: if verbose: sys.stderr.write(f'FAILED to get player url!\n') return False if player_url == new_url: if verbose >= 2: sys.stderr.write(f'player url unchanged.\n') if not force: return True (cipher_id,) = re.match(r"/s/player/(.*?)\.js", new_url).groups() sts, algo = ytdown_guess(cipher_id, verbose, force) c.execute(""" INSERT OR REPLACE INTO cipher (rowid, url, sts, algorithm) VALUES (0,?,?,?) """, (new_url, sts, algo)) def find_player_url(verbose, id="jNQXAC9IVRw"): """ Extract the player.js URL, which can be passed to youtubedown. Requests a random video (defaults to the first ever uploaded one, "Me at the zoo". It shouldn't go away any time soon) and parses the returned Tag Soup. Note that the URL we're looking for contains our locale, so specify it to avoid it changing. """ class FindPlayer(html.parser.HTMLParser): def __init__(self, feed): super().__init__() self.player_js = None super().feed(feed) def handle_starttag(self, tag, attrs): attrs = dict(attrs) if tag == "script" and attrs.get("name") == "player_ias/base": self.player_js = attrs.get("src") if verbose >= 2: sys.stderr.write(f'fetching embed page {id}\n') r = requests.get(f"https://www.youtube-nocookie.com/embed/{id}?hl=en&gl=US") if not r.ok: if verbose: sys.stderr.write(f'FAILED {r.status_code}: {r.text[:128]}\n') return None player_js = FindPlayer(r.text).player_js if verbose >= 2: sys.stderr.write(f'player.js is {player_js}\n') return player_js def ytdown_guess(cipher_id, verbose, force): ytdown = "/tmp/youtubedown.pm" # update youtubedown once a week: if force or not os.path.isfile(ytdown) or \ os.stat(ytdown).st_mtime < time.time()-7*24*60*60: if verbose >= 2: sys.stderr.write('downloading youtubedown\n') r = requests.get("https://www.jwz.org/hacks/youtubedown") # UA sniffing! if not r.ok: if verbose: sys.stderr.write(f'FAILED {r.status_code}: {r.text[:128]}\n') return None, None elif verbose >= 2: sys.stderr.write(f'done (code {r.status_code})\n') # youtubedown unconditionally calls "main(); exit 0;", which breaks # using it as a module: contents = "\n".join(r.text.splitlines()[:-2] + ["1;"]) with open(ytdown, 'w') as f: f.write(contents) perl = subprocess.run( ["perl", "-wE", """ require $ARGV[0]; say guess_cipher($ARGV[1], 0, 0); """, "--", ytdown, cipher_id ], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) sts, algo = perl.stdout.decode('ascii').strip().split(" ", 1) if verbose >= 2: sys.stderr.write(f'sts, algo = {sts}, {algo} (exit:{perl.returncode})\n') return sts, algo if __name__ == '__main__': verbosity = 2 if '-vv' in sys.argv else 1 if '-v' in sys.argv else 0 limit = 1 if '-1' in sys.argv else -1 force = '-f' in sys.argv if 'pull' in sys.argv: pull_subscriptions(verbosity, force, limit) elif 'websub' in sys.argv: update_subscriptions(verbosity, force, limit) elif 'cipher' in sys.argv: refresh_cipher(verbosity, force) else: sys.stderr.write( f'Usage: YT_CONFIG=... {sys.argv[0]} pull [-f] [-1] [-v|-vv]\n' f' YT_CONFIG=... {sys.argv[0]} websub [-f] [-1] [-v|-vv]\n' f' YT_CONFIG=... {sys.argv[0]} cipher [-f] [-v|-vv]\n' f'-f: force even if still up-to-date-ish\n' f'-v: report errors\n' f'-vv: report accessed feeds\n' f'-1: limit to one feed (for testing it works)\n') sys.exit(1)