#!/bin/sh ''':' . "`dirname "$0"`/../../venv/bin/activate" exec python "$0" "$@" ':''' import sys import time import secrets import sqlite3 import requests from common import * feed_param = { 'channel': 'channel_id', 'playlist': 'playlist_id', } def pull_subscriptions(verbose=1, force_all=False, limit=-1): """ Crawls youtube channels' RSS feeds and stores found videos in the database. verbose: 0: completely silent; 1: warn on errors; 2: log all accessed feeds force_all: fetch all known channels. otherwise only those not crawled in 24h limit: limit number of processed feeds """ with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT DISTINCT s.channel_id, type FROM subscriptions AS s LEFT JOIN crawler AS c ON s.channel_id = c.channel_id WHERE ? OR IFNULL(crawled_at,0) < datetime('now', '-1 day') ORDER BY crawled_at LIMIT ? """, (force_all,limit)) results = c.fetchall() if verbose >= 2 and not len(results): sys.stderr.write(f'no feeds to update.\n') for i,(feed_id, feed_type) in enumerate(results): if i: time.sleep(60) pull_feed(feed_id, feed_type, conn, verbose) def pull_feed(feed_id, feed_type, conn, verbose): c = conn.cursor() if verbose >= 2: sys.stderr.write(f'fetching {feed_id}\n') xmlfeed = fetch_xml(feed_param[feed_type], feed_id) if not xmlfeed: if verbose: sys.stderr.write(f'FETCH FAILED: {feed_id}\n') return False try: update_channel(conn, xmlfeed) except: if verbose: sys.stderr.write(f'STORE FAILED: {feed_id}\n') # writing failed, so we store the feed in a file for later analysis. with open('/tmp/pull-subscriptions.err', 'ab') as f: f.write(f"\n" .encode('ascii')) f.write(xmlfeed + b"\n") return False # update crawled_at timestamp: c.execute(""" INSERT OR REPLACE INTO crawler (channel_id) VALUES (?) """, (feed_id,)) conn.commit() return True def update_subscriptions(verbose=1, force_all=False, limit=-1): """ Refreshes the websub (pubsubhubhub) subscription requests for youtube feeds. verbose: 0: completely silent; 1: warn on errors; 2: log all accessed feeds limit: limit number of processed feeds """ with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT DISTINCT s.channel_id, type FROM subscriptions AS s LEFT JOIN websub AS w ON s.channel_id = w.channel_id WHERE ? OR IFNULL(subscribed_until,0) < datetime('now','+12 hours') ORDER BY subscribed_until LIMIT ? """, (force_all,limit)) results = c.fetchall() if verbose >= 2 and not len(results): sys.stderr.write(f'no feeds to update.\n') for i,(feed_id, feed_type) in enumerate(results): if i: time.sleep(60) update_feed(feed_id, feed_type, verbose) def update_feed(feed_id, feed_type, verbose): webhook = cf['websub']['public_uri'] lease = cf['websub']['lease'] hmackey = cf['websub']['hmac_key'] if verbose >= 2: sys.stderr.write(f'updating {feed_id}\n') feed_type = feed_param[feed_type] version, timestamp = "v1", int(time.time()) nonce = secrets.token_urlsafe(16) sig = websub_url_hmac(hmackey, feed_id, timestamp, nonce) r = requests.post("https://pubsubhubbub.appspot.com/subscribe", { "hub.callback": f"{webhook}/websub/{version}/{timestamp}/" + \ f"{nonce}/{feed_id}/{sig}", "hub.topic": f"https://www.youtube.com/xml/feeds/videos.xml" + \ f"?{feed_type}={feed_id}", "hub.verify": "async", "hub.mode": "subscribe", "hub.lease_seconds": lease, "hub.secret": hmackey, }) if not r.ok: if verbose: sys.stderr.write(f'FAILED {feed_id}: {r.text}\n') return False return True if __name__ == '__main__': if len(sys.argv) < 2 or sys.argv[1] not in ['pull','websub']: sys.stderr.write( f'Usage: YT_CONFIG=... {sys.argv[0]} pull [-f] [-1] [-v|-vv]\n' f' YT_CONFIG=... {sys.argv[0]} websub [-f] [-1] [-v|-vv]\n' f'-f: force even if still up-to-date-ish\n' f'-v: report errors\n' f'-vv: report accessed feeds\n' f'-1: limit to one feed (for testing it works)\n') sys.exit(1) verbosity = 2 if '-vv' in sys.argv else 1 if '-v' in sys.argv else 0 limit = 1 if '-1' in sys.argv else -1 force = '-f' in sys.argv if 'pull' in sys.argv: pull_subscriptions(verbosity, force, limit) elif 'websub' in sys.argv: update_subscriptions(verbosity, force, limit)