import os import re import json import requests import requests_cache import dateutil.parser from xml.etree import ElementTree from configparser import ConfigParser from datetime import datetime, timezone from urllib.parse import parse_qs, urlparse cf = ConfigParser() config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini') cf.read(config_filename) # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,)) # Note: this should only be required for the 'memory' backed cache. # TODO: only run for long-running processes, i.e. the frontend from threading import Timer def purge_cache(sec): requests_cache.remove_expired_responses() t = Timer(sec, purge_cache, args=(sec,)) t.setDaemon(True) t.start() purge_cache(10*60) def fetch_xml(feed_type, feed_id): # TODO: handle requests.exceptions.ConnectionError r = requests.get(f"https://www.youtube.com/feeds/videos.xml?{feed_type}={feed_id}") if not r.ok: return None return r.text def parse_xml(xmldata): ns = { 'atom':"http://www.w3.org/2005/Atom", 'yt': "http://www.youtube.com/xml/schemas/2015", 'media':"http://search.yahoo.com/mrss/", 'at': "http://purl.org/atompub/tombstones/1.0", } feed = ElementTree.fromstring(xmldata) if feed.find('at:deleted-entry',ns): author = feed.find('at:deleted-entry/at:by/name',ns).text ref = feed.find('at:deleted-entry',ns).get('ref') (_, _, video_id) = ref.rpartition(':') return None, None, [] title = feed.find('atom:title',ns).text author = feed.find('atom:author/atom:name',ns).text \ if feed.find('atom:author',ns) else None videos = [] for entry in feed.findall('atom:entry',ns): videos.append({ 'video_id': entry.find('yt:videoId',ns).text, 'title': entry.find('atom:title',ns).text, 'published': entry.find('atom:published',ns).text, 'channel_id': entry.find('yt:channelId',ns).text, 'author': entry.find('atom:author',ns).find('atom:name',ns).text, # extra fields for pull_subs/webhook: 'updated': entry.find('atom:updated',ns).text, }) return title, author, videos def update_channel(db, xmldata): if not xmldata: return False # Note: websub does not return global author, hence taking from first video title, _, videos = parse_xml(xmldata) # TODO: if not title: delete from videos (this should only be implemented after webhook hmac validation!) c = db.cursor() for i, video in enumerate(videos): now = datetime.now(timezone.utc) updated = dateutil.parser.parse(video['updated']) published = dateutil.parser.parse(video['published']) # if update and published time are near-identical, we assume it's new. if (updated - published).seconds < 60 and (now - published).days < 7: timestamp = now else:#, it's just an update to an older video. timestamp = published c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled) VALUES (?, ?, ?, datetime(?), datetime(?)) """, ( video['video_id'], video['channel_id'], video['title'], video['published'], timestamp )) if i == 0: # only required once per feed c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (video['channel_id'], video['author'])) db.commit() return True def get_video_info(video_id, sts=0, algo=""): """ returns: best-quality muxed video stream, player_response, error-type/mesage error types: player, malformed, livestream, geolocked, exhausted """ player_error = None # for 'exhausted' for el in ['embedded', 'detailpage']:#sometimes, only one or the other works r = requests.get(f"https://www.youtube.com/get_video_info"+ f"?video_id={video_id}"+ f"&eurl=https://youtube.googleapis.com/v/{video_id}"+ f"&el={el}"+ f"&sts={sts}"+ f"&hl=en_US") #"&hl=en&gl=US" params = parse_qs(r.text) if 'errorcode' in params: # status=fail return None, None, 'malformed', params['reason'][0] metadata = json.loads(params.get('player_response')[0]) playabilityStatus = metadata['playabilityStatus']['status'] if playabilityStatus != "OK": playabilityReason = metadata['playabilityStatus']['reason'] player_error = f"{playabilityStatus}: {playabilityReason}" if playabilityStatus == "UNPLAYABLE": continue # try again with next el value (or fail as exhausted) # without videoDetails, there's only the error message maybe_metadata = metadata if 'videoDetails' in metadata else None return None, maybe_metadata, 'player', player_error if metadata['videoDetails']['isLiveContent'] and \ (metadata['videoDetails'].get('isLive', False) or \ metadata['videoDetails'].get('isPostLiveDvr', False)): return None, metadata, 'livestream', None if not 'formats' in metadata['streamingData']: continue # no urls formats = metadata['streamingData']['formats'] for (i,v) in enumerate(formats): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) formats[i]['url'] = unscramble(cipher, algo) # todo: check if we have urls or try again url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url'] if 'gcr' in parse_qs(url): return None, metadata, 'geolocked', None return url, metadata, None, None else: return None, metadata, 'exhausted', player_error def unscramble(cipher, algo): # test video id: UxxajLWwzqY signature = list(cipher['s'][0]) for c in algo.split(): op, ix = re.match(r"([rsw])(\d+)?", c).groups() ix = int(ix) % len(signature) if ix else 0 if not op: continue if op == 'r': signature = list(reversed(signature)) if op == 's': signature = signature[ix:] if op == 'w': signature[0], signature[ix] = signature[ix], signature[0] sp = cipher.get('sp', ['signature'])[0] sig = cipher.get('sig', [''.join(signature)])[0] return f"{cipher['url'][0]}&{sp}={sig}" def prepare_metadata(metadata): meta1 = metadata['videoDetails'] meta2 = metadata['microformat']['playerMicroformatRenderer'] cards = metadata['cards']['cardCollectionRenderer']['cards'] \ if 'cards' in metadata else [] endsc = metadata['endscreen']['endscreenRenderer']['elements'] \ if 'endscreen' in metadata else [] # the actual video streams have exact information: try: sd = metadata['streamingData'] some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0] aspect_ratio = some_stream['width'] / some_stream['height'] # if that's unavailable (e.g. on livestreams), fall back to # thumbnails (only either 4:3 or 16:9). except: some_img = meta2['thumbnail']['thumbnails'][0] aspect_ratio = some_img['width'] / some_img['height'] subtitles = sorted([ {'url':cc['baseUrl'], 'code':cc['languageCode'], 'autogenerated':cc.get('kind')=="asr", 'name':cc['name']['simpleText']} for cc in metadata.get('captions',{}) .get('playerCaptionsTracklistRenderer',{}) .get('captionTracks',[]) ], key=lambda cc: cc['autogenerated']) def clean_url(url): # externals URLs are redirected through youtube.com/redirect, but we # may encounter internal URLs, too return parse_qs(urlparse(url).query).get('q',[url])[0] # Remove left-/rightmost word from string: delL = lambda s: s.partition(' ')[2] delR = lambda s: s.rpartition(' ')[0] # Thousands seperator aware int(): intT = lambda s: int(s.replace(',', '')) def parse_infocard(card): card = card['cardRenderer'] ctype = list(card['content'].keys())[0] content = card['content'][ctype] if ctype == "pollRenderer": ctype = "POLL" content = { 'question': content['question']['simpleText'], 'answers': [(a['text']['simpleText'],a['numVotes']) \ for a in content['choices']], } elif ctype == "videoInfoCardContentRenderer": ctype = "VIDEO" # if the card references a live stream, it has no length, but a "LIVE NOW" badge. # TODO: this is ugly; cleanup. is_live = content.get('badge',{}).get('liveBadgeRenderer',{}) length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03' content = { 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['videoTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'length': length, 'views': intT(delR(content['viewCountText']['simpleText'])), } elif ctype == "playlistInfoCardContentRenderer": ctype = "PLAYLIST" content = { 'playlist_id': content['action']['watchEndpoint']['playlistId'], 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['playlistTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'n_videos': intT(content['playlistVideoCount']['simpleText']), } elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']: ctype = "WEBSITE" content = { 'url': clean_url(content['command']['urlEndpoint']['url']), 'domain': content['displayDomain']['simpleText'], 'title': content['title']['simpleText'], # XXX: no thumbnails for infocards } elif ctype == "collaboratorInfoCardContentRenderer": ctype = "CHANNEL" content = { 'channel_id': content['endpoint']['browseEndpoint']['browseId'], 'title': content['channelName']['simpleText'], 'icons': mkthumbs(content['channelAvatar']['thumbnails']), 'subscribers': content['subscriberCountText']['simpleText'], # "545K subscribers" } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} def mkthumbs(thumbs): return {e['height']: e['url'] for e in thumbs} def parse_endcard(card): card = card.get('endscreenElementRenderer', card) #only sometimes nested ctype = card['style'] if ctype == "CHANNEL": content = { 'channel_id': card['endpoint']['browseEndpoint']['browseId'], 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } elif ctype == "VIDEO": content = { 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'length': card['videoDuration']['simpleText'], # '12:21' 'views': delR(card['metadata']['simpleText']), # XXX: no channel name } elif ctype == "PLAYLIST": content = { 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'author': delL(card['metadata']['simpleText']), 'n_videos': intT(delR(card['playlistLength']['simpleText'])), } elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE": ctype = "WEBSITE" url = clean_url(card['endpoint']['urlEndpoint']['url']) content = { 'url': url, 'domain': urlparse(url).netloc, 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} infocards = [parse_infocard(card) for card in cards] endcards = [parse_endcard(card) for card in endsc] # combine cards to weed out duplicates. for videos and playlists prefer # infocards, for channels and websites prefer endcards, as those have more # information than the other. # if the card type is not in ident, we use the whole card for comparison # (otherwise they'd all replace each other) ident = { # ctype -> ident 'VIDEO': 'video_id', 'PLAYLIST': 'playlist_id', 'CHANNEL': 'channel_id', 'WEBSITE': 'url', 'POLL': 'question', } getident = lambda c: c['content'].get(ident.get(c['type']), c) mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types] exclude = lambda cards, without: [c for c in cards if getident(c) not in without] allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \ exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST'])) all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split() whitelisted = sorted(meta2.get('availableCountries',[])) blacklisted = sorted(set(all_countries) - set(whitelisted)) return { 'title': meta1['title'], 'author': meta1['author'], 'channel_id': meta1['channelId'], 'description': meta1['shortDescription'], 'published': meta2['publishDate'], 'views': meta1['viewCount'], 'length': int(meta1['lengthSeconds']), 'rating': meta1['averageRating'], 'category': meta2['category'], 'aspectr': aspect_ratio, 'unlisted': meta2['isUnlisted'], 'countries': whitelisted, 'blacklisted': blacklisted, 'poster': meta2['thumbnail']['thumbnails'][0]['url'], 'infocards': infocards, 'endcards': endcards, 'all_cards': allcards, 'subtitles': subtitles, } class RedditException(Exception): pass def fetch_reddit(subreddits, sorted_by="hot", time=None, *, limit=36, count=None, before=None, after=None): """ fetches data from a subreddit (or a multireddit like gif+gifs) and filters/sorts results. sorted_by values: hot, new, rising, controversial, top time values: hour, week, month, year, all (for top and controversial) returns a tuple of ([{video}],before,after) """ # TODO: /api/info.json?id=t3_h7mjes == /by_id/t3_h7mjes.json if not subreddits: return [], None, None query = '&'.join([f"{k}={v}" for k,v in { 'count':count, 'before':before, 'after':after, 'limit':limit, # 1..100 (default 25) 't': time, # hour,week,month,year,all }.items() if v]) multireddit = '+'.join(subreddits) r = requests.get(f"https://old.reddit.com/r/{multireddit}/{sorted_by}.json?{query}", headers={'User-Agent':'Mozilla/5.0'}) if not r.ok or not 'data' in r.json(): raise RedditException(r.text) videos = [] entries = sorted(r.json()['data']['children'], key=lambda e: e['data']['score'] > 1, reverse=True) for entry in entries: e = entry['data'] if e['domain'] not in ['youtube.com', 'youtu.be', 'invidio.us']: continue try: # Note: youtube.com/ is not valid (404s), but seen in the wild. video_id = re.match(r'^https?://(?:www.|m.)?(?:youtube.com/watch\?(?:.*&)?v=|youtu.be/|youtube.com/embed/|youtube.com/)([-_0-9A-Za-z]+)', e['url']).group(1) except: continue # XXX: should we log that? if not video_id: continue videos.append({ 'video_id': video_id, 'title': e['title'], 'url': e['permalink'], 'n_comments': e['num_comments'], 'n_karma': e['score'], 'subreddit': e['subreddit'], 'post_id': e['id'], }) before = r.json()['data']['before'] after = r.json()['data']['after'] return videos, before, after def pp(*args): from pprint import pprint import sys, codecs pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))