import os
import re
import json
import html
import base64
import sqlite3
import requests
import hmac, hashlib
import requests_cache
import dateutil.parser
from xml.etree import ElementTree
from configparser import ConfigParser
from datetime import datetime, timezone
from urllib.parse import parse_qs, urlparse
cf = ConfigParser()
config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
cf.read(config_filename)
if not 'global' in cf: # todo: full config check
raise Exception("Configuration file not found or empty")
# Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc
requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,))
# Note: this should only be required for the 'memory' backed cache.
# TODO: only run for long-running processes, i.e. the frontend
from threading import Timer
def purge_cache(sec):
requests_cache.remove_expired_responses()
t = Timer(sec, purge_cache, args=(sec,))
t.setDaemon(True)
t.start()
purge_cache(10*60)
# for debugging purposes, monkey patch requests session to store each requests-request in a flask-request's g object (url and response). we can then use a flask error_handler to include the request data in the error log.
# since we also call config from outside the flask appcontext, it is wrapped in a try-catch block.
from flask import g
import requests
from requests import Session as OriginalSession
class _NSASession(OriginalSession):
def request(self, method, url, params=None, data=None, **kwargs):
response = super(_NSASession, self).request(
method, url, params, data, **kwargs
)
try:
if 'api_requests' not in g:
g.api_requests = []
g.api_requests.append((url, params, response.text))
except RuntimeError: pass # not within flask (e.g. utils.py)
return response
requests.Session = requests.sessions.Session = _NSASession
def fetch_xml(feed_type, feed_id):
# TODO: handle requests.exceptions.ConnectionError
r = requests.get("https://www.youtube.com/feeds/videos.xml", {
feed_type: feed_id,
})
if not r.ok:
return None
return r.content
def parse_xml(xmldata):
ns = {
'atom':"http://www.w3.org/2005/Atom",
'yt': "http://www.youtube.com/xml/schemas/2015",
'media':"http://search.yahoo.com/mrss/",
'at': "http://purl.org/atompub/tombstones/1.0",
}
feed = ElementTree.fromstring(xmldata)
if feed.find('at:deleted-entry',ns):
(_,_,vid) = feed.find('at:deleted-entry',ns).get('ref').rpartition(':')
return None, None, [{'deleted': True, 'video_id': vid}]
title = feed.find('atom:title',ns).text
author = feed.find('atom:author/atom:name',ns).text \
if feed.find('atom:author',ns) else None
videos = []
for entry in feed.findall('atom:entry',ns):
videos.append({
'video_id': entry.find('yt:videoId',ns).text,
'title': entry.find('atom:title',ns).text,
'published': entry.find('atom:published',ns).text,
'channel_id': entry.find('yt:channelId',ns).text,
'author': entry.find('atom:author',ns).find('atom:name',ns).text,
# extra fields for pull_subs/webhook:
'updated': entry.find('atom:updated',ns).text,
})
return title, author, videos
def update_channel(db, xmldata, from_webhook=False):
if not xmldata: return False
# Note: websub does not return global author, hence taking from first video
_, _, videos = parse_xml(xmldata)
c = db.cursor()
from flask import current_app # XXX: remove
for i, video in enumerate(videos):
if video.get('deleted'):
if from_webhook: current_app.logger.warning(f"ignoring deleted video {video['video_id']}") # XXX: remove
# TODO: enable once we enforce hmac validation:
#c.execute("DELETE FROM videos WHERE id = ?", (video['video_id'],))
break
now = datetime.now(timezone.utc)
updated = dateutil.parser.parse(video['updated'])
published = dateutil.parser.parse(video['published'])
# if update and published time are near-identical, we assume it's new.
# checking if it was posted this week is necessary during xmlfeed pulling.
if (updated - published).seconds < 60 and (now - published).days < 7:
timestamp = now
if from_webhook: current_app.logger.warning(f"fresh video {video['video_id']}") # XXX: remove
else:#, it might just an update to an older video, or a previously unlisted one.
# first, assume it's an older video (correct when pulling xmlfeeds)
timestamp = published
# then, check if we don't know about it and if so, look up the real date.
# The 'published' timestamp sent in websub POSTs are often wrong (e.g.:
# video gets uploaded as unlisted on day A and set to public on day B;
# the webhook is sent on day B, but 'published' says A. The video
# therefore looks like it's just an update to an older video). If
# that's the case, we fetch get_video_info and double-check.
# We only need to do this to not-yet-in-the-database videos.
c.execute("SELECT 1 from videos where id = ?", (video['video_id'],))
new_video = len(c.fetchall()) < 1
if from_webhook: current_app.logger.warning(f"video {video['video_id']}") # XXX: remove
if from_webhook and new_video:
if from_webhook: current_app.logger.warning(f" is webhook and new") # XXX: remove
_, meta, _, _ = get_video_info(video['video_id'])
if meta:
meta = prepare_metadata(meta)
published = dateutil.parser.parse(meta['published'])
if from_webhook: current_app.logger.warning(f" uploaded {published}") # XXX: remove
if (now - published).days < 7:
timestamp = now
else:#, it's just an update to an older video.
timestamp = published
c.execute("""
INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
VALUES (?, ?, ?, datetime(?), datetime(?))
""", (
video['video_id'],
video['channel_id'],
video['title'],
video['published'],
timestamp
))
if i == 0: # only required once per feed
c.execute("""
INSERT OR REPLACE INTO channels (id, name)
VALUES (?, ?)
""", (video['channel_id'], video['author']))
db.commit()
return True
def get_video_info(video_id, sts=0, algo=""):
"""
returns: best-quality muxed video stream, player_response, error-type/mesage
error types: player, malformed, livestream, geolocked, exhausted
"""
player_error = None # for 'exhausted'
for el in ['embedded', 'detailpage']:#sometimes, only one or the other works
r = requests.get("https://www.youtube.com/get_video_info", {
"video_id": video_id,
"eurl": f"https://youtube.googleapis.com/v/{video_id}",
"el": el,
"sts": sts,
"hl": "en_US",
})
params = parse_qs(r.text)
if 'errorcode' in params: # status=fail
return None, None, 'malformed', params['reason'][0]
metadata = json.loads(params.get('player_response')[0])
playabilityStatus = metadata['playabilityStatus']['status']
if playabilityStatus != "OK":
playabilityReason = metadata['playabilityStatus'].get('reason',
'//'.join(metadata['playabilityStatus'].get('messages',[])))
player_error = f"{playabilityStatus}: {playabilityReason}"
if playabilityStatus == "UNPLAYABLE":
continue # try again with next el value (or fail as exhausted)
# without videoDetails, there's only the error message
maybe_metadata = metadata if 'videoDetails' in metadata else None
return None, maybe_metadata, 'player', player_error
if metadata['videoDetails']['isLiveContent'] and \
(metadata['videoDetails'].get('isLive', False) or \
metadata['videoDetails'].get('isPostLiveDvr', False)):
return None, metadata, 'livestream', None
if not 'formats' in metadata['streamingData']:
continue # no urls
formats = metadata['streamingData']['formats']
for (i,v) in enumerate(formats):
if not ('cipher' in v or 'signatureCipher' in v): continue
cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
formats[i]['url'] = unscramble(cipher, algo)
# todo: check if we have urls or try again
url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
if 'gcr' in parse_qs(url):
return None, metadata, 'geolocked', None
return url, metadata, None, None
else:
return None, metadata, 'exhausted', player_error
def unscramble(cipher, algo): # test video id: UxxajLWwzqY
signature = list(cipher['s'][0])
for c in algo.split():
op, ix = re.match(r"([rsw])(\d+)?", c).groups()
ix = int(ix) % len(signature) if ix else 0
if not op: continue
if op == 'r': signature = list(reversed(signature))
if op == 's': signature = signature[ix:]
if op == 'w': signature[0], signature[ix] = signature[ix], signature[0]
sp = cipher.get('sp', ['signature'])[0]
sig = cipher.get('sig', [''.join(signature)])[0]
return f"{cipher['url'][0]}&{sp}={sig}"
def prepare_metadata(metadata):
meta1 = metadata['videoDetails']
meta2 = metadata['microformat']['playerMicroformatRenderer']
cards = metadata['cards']['cardCollectionRenderer']['cards'] \
if 'cards' in metadata else []
endsc = metadata['endscreen']['endscreenRenderer']['elements'] \
if 'endscreen' in metadata else []
# the actual video streams have exact information:
try:
sd = metadata['streamingData']
some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0]
aspect_ratio = some_stream['width'] / some_stream['height']
# if that's unavailable (e.g. on livestreams), fall back to
# thumbnails (only either 4:3 or 16:9).
except:
some_img = meta2['thumbnail']['thumbnails'][0]
aspect_ratio = some_img['width'] / some_img['height']
# Note: we could get subtitles in multiple formats directly by querying
# https://video.google.com/timedtext?hl=en&type=list&v= followed by
# https://www.youtube.com/api/timedtext?lang=&v=&fmt={srv1|srv2|srv3|ttml|vtt},
# but that won't give us autogenerated subtitles (and is an extra request).
# we can still add &fmt= to the extracted URLs below (first one takes precedence).
try: # find the native language captions (assuming there is only 1 audioTrack) (any level might not exist):
default_track = metadata.get('captions',{}).get('playerCaptionsTracklistRenderer',{}).get('defaultAudioTrackIndex', 0)
main_subtitle = metadata['captions']['playerCaptionsTracklistRenderer']['audioTracks'][default_track]['defaultCaptionTrackIndex']
except:
main_subtitle = -1
subtitles = sorted([
{'url':cc['baseUrl'],
'code':cc['languageCode'],
'autogenerated':cc.get('kind')=="asr",
'name':cc['name']['simpleText'],
'default':i==main_subtitle,
'query':"fmt=vtt&"+urlparse(cc['baseUrl']).query} # for our internal proxy
for i,cc in enumerate(metadata.get('captions',{})
.get('playerCaptionsTracklistRenderer',{})
.get('captionTracks',[]))
# sort order: default lang gets weight 0 (first), other manually translated weight 1, autogenerated weight 2:
], key=lambda cc: (not cc['default']) + cc['autogenerated'])
def clean_url(url):
# externals URLs are redirected through youtube.com/redirect, but we
# may encounter internal URLs, too
return parse_qs(urlparse(url).query).get('q',[url])[0]
# Remove left-/rightmost word from string:
delL = lambda s: s.partition(' ')[2]
delR = lambda s: s.rpartition(' ')[0]
# Thousands seperator aware int():
intT = lambda s: int(s.replace(',', ''))
def parse_infocard(card):
card = card['cardRenderer']
ctype = list(card['content'].keys())[0]
content = card['content'][ctype]
if ctype == "pollRenderer":
ctype = "POLL"
content = {
'question': content['question']['simpleText'],
'answers': [(a['text']['simpleText'],a['numVotes']) \
for a in content['choices']],
}
elif ctype == "videoInfoCardContentRenderer":
ctype = "VIDEO"
# if the card references a live stream, it has no length, but a "LIVE NOW" badge.
# TODO: this is ugly; cleanup.
is_live = content.get('badge',{}).get('liveBadgeRenderer',{})
length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03'
content = {
'video_id': content['action']['watchEndpoint']['videoId'],
'title': content['videoTitle']['simpleText'],
'author': delL(content['channelName']['simpleText']),
'length': length,
'views': intT(delR(content['viewCountText']['simpleText'])),
}
elif ctype == "playlistInfoCardContentRenderer":
ctype = "PLAYLIST"
content = {
'playlist_id': content['action']['watchEndpoint']['playlistId'],
'video_id': content['action']['watchEndpoint']['videoId'],
'title': content['playlistTitle']['simpleText'],
'author': delL(content['channelName']['simpleText']),
'n_videos': intT(content['playlistVideoCount']['simpleText']),
}
elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']:
ctype = "WEBSITE"
content = {
'url': clean_url(content['command']['urlEndpoint']['url']),
'domain': content['displayDomain']['simpleText'],
'title': content['title']['simpleText'],
# XXX: no thumbnails for infocards
}
elif ctype == "collaboratorInfoCardContentRenderer":
ctype = "CHANNEL"
content = {
'channel_id': content['endpoint']['browseEndpoint']['browseId'],
'title': content['channelName']['simpleText'],
'icons': mkthumbs(content['channelAvatar']['thumbnails']),
'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
}
else:
import pprint
content = {'error': f"{ctype} is not implemented; {pprint.pformat(card)}
"}
return {'type': ctype, 'content': content}
def mkthumbs(thumbs):
return {e['height']: e['url'] for e in thumbs}
def parse_endcard(card):
card = card.get('endscreenElementRenderer', card) #only sometimes nested
ctype = card['style']
if ctype == "CHANNEL":
content = {
'channel_id': card['endpoint']['browseEndpoint']['browseId'],
'title': card['title']['simpleText'],
'icons': mkthumbs(card['image']['thumbnails']),
}
elif ctype == "VIDEO":
content = {
'video_id': card['endpoint']['watchEndpoint']['videoId'], # XXX: KeyError 'endpoint' exception (no idea which youtube video this was on)
'title': card['title']['simpleText'],
'length': card['videoDuration']['simpleText'], # '12:21'
'views': delR(card['metadata']['simpleText']),
# XXX: no channel name
}
elif ctype == "PLAYLIST":
content = {
'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
'video_id': card['endpoint']['watchEndpoint']['videoId'],
'title': card['title']['simpleText'],
'author': delL(card['metadata']['simpleText']),
'n_videos': intT(delR(card['playlistLength']['simpleText'])),
}
elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
ctype = "WEBSITE"
url = clean_url(card['endpoint']['urlEndpoint']['url'])
content = {
'url': url,
'domain': urlparse(url).netloc,
'title': card['title']['simpleText'],
'icons': mkthumbs(card['image']['thumbnails']),
}
else:
import pprint
content = {'error': f"{ctype} is not implemented; {pprint.pformat(card)}
"}
return {'type': ctype, 'content': content}
infocards = [parse_infocard(card) for card in cards]
endcards = [parse_endcard(card) for card in endsc]
# combine cards to weed out duplicates. for videos and playlists prefer
# infocards, for channels and websites prefer endcards, as those have more
# information than the other.
# if the card type is not in ident, we use the whole card for comparison
# (otherwise they'd all replace each other)
ident = { # ctype -> ident
'VIDEO': 'video_id',
'PLAYLIST': 'playlist_id',
'CHANNEL': 'channel_id',
'WEBSITE': 'url',
'POLL': 'question',
}
getident = lambda c: c['content'].get(ident.get(c['type']), c)
mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types]
exclude = lambda cards, without: [c for c in cards if getident(c) not in without]
allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \
exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST']))
all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD
BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH
CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER
ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT
GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE
KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD
ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF
NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA
RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX
SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ
VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split()
whitelisted = sorted(meta2.get('availableCountries',[]))
blacklisted = sorted(set(all_countries) - set(whitelisted))
published_at = f"{meta2['publishDate']}T00:00:00Z" # yyyy-mm-dd
# 'premiere' videos (and livestreams?) have a ISO8601 date available:
if 'liveBroadcastDetails' in meta2 and 'startTimestamp' in meta2['liveBroadcastDetails']: # TODO: tighten up
published_at = meta2['liveBroadcastDetails']['startTimestamp']
return {
'title': meta1['title'],
'author': meta1['author'],
'channel_id': meta1['channelId'],
'description': meta1['shortDescription'],
'published': published_at,
'views': meta1['viewCount'],
'length': int(meta1['lengthSeconds']),
'rating': meta1['averageRating'],
'category': meta2['category'],
'aspectr': aspect_ratio,
'unlisted': meta2['isUnlisted'],
'whitelisted': whitelisted,
'blacklisted': blacklisted,
'poster': meta2['thumbnail']['thumbnails'][0]['url'],
'infocards': infocards,
'endcards': endcards,
'all_cards': allcards,
'subtitles': subtitles,
}
def store_video_metadata(video_id):
# check if we know about it, and if not, fetch and store video metadata
with sqlite3.connect(cf['global']['database']) as conn:
c = conn.cursor()
c.execute("SELECT 1 from videos where id = ?", (video_id,))
new_video = len(c.fetchall()) < 1
if new_video:
_, meta, _, _ = get_video_info(video_id)
if meta:
meta = prepare_metadata(meta)
c.execute("""
INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
VALUES (?, ?, ?, datetime(?), datetime(?))
""", (
video_id,
meta['channel_id'],
meta['title'],
meta['published'],
meta['published'],
))
c.execute("""
INSERT OR REPLACE INTO channels (id, name)
VALUES (?, ?)
""", (meta['channel_id'], meta['author']))
class RedditException(Exception): pass
def fetch_reddit(subreddits, sorted_by="hot", time=None, *, limit=36,
count=None, before=None, after=None):
"""
fetches data from a subreddit (or a multireddit like gif+gifs) and
filters/sorts results.
sorted_by values: hot, new, rising, controversial, top
time values: hour, day, week, month, year, all (for top and controversial)
"""
if not subreddits:
return None
query = {k:v for k,v in {
'count':count,
'before':before,
'after':after,
'limit':limit, # 1..100 (default 25)
't': time, # hour,week,month,year,all
}.items() if v}
multireddit = '+'.join(subreddits)
r = requests.get(f"https://old.reddit.com/r/{multireddit}/{sorted_by}.json",
query, headers={'User-Agent':'Mozilla/5.0'})
if not r.ok or not 'data' in r.json():
raise RedditException(r.text)
return r.json()
def fetch_reddit_post(post_id):
# Note: /api/info.json?id=t3_h7mjes == /by_id/t3_h7mjes.json
r = requests.get(f"https://old.reddit.com/by_id/t3_{post_id}.json",
headers={'User-Agent':'Mozilla/5.0'})
if not r.ok or not 'data' in r.json():
raise RedditException(r.text)
return r.json()
def parse_reddit_videos(data):
videos = []
entries = sorted(data['data']['children'],
key=lambda e: e['data']['score'] > 1,
reverse=True)
for entry in entries:
e = entry['data']
if e['domain'] not in ['youtube.com', 'youtu.be', 'invidio.us']:
continue
try:
# Note: youtube.com/ is not valid (404s), but seen in the wild.
video_id = re.match(r'^https?://(?:www.|m.)?(?:youtube.com/watch\?(?:.*&)?v=|youtu.be/|youtube.com/embed/|youtube.com/)([-_0-9A-Za-z]+)', e['url']).group(1)
except:
continue # XXX: should we log that?
if not video_id: continue
videos.append({
'video_id': video_id,
'title': html.unescape(e['title']), # Note: we unescape and re-escape in the template
'url': e['permalink'],
'n_comments': e['num_comments'],
'n_karma': e['score'],
'subreddit': e['subreddit'],
'post_id': e['id'],
})
return videos
from werkzeug.exceptions import NotFound
class NoFallbackException(NotFound): pass
def fallback_route(*args, **kwargs): # TODO: worthy as a flask-extension?
"""
finds the next route that matches the current url rule, and executes it.
args, kwargs: pass all arguments of the current route
"""
from flask import current_app, request, g
# build a list of endpoints that match the current request's url rule:
matching = [
rule.endpoint
for rule in current_app.url_map.iter_rules()
if rule.rule == request.url_rule.rule
]
current = matching.index(request.endpoint)
# since we can't change request.endpoint, we always get the original
# endpoint back. so for repeated fall throughs, we use the g object to
# increment how often we want to fall through.
if not '_fallback_next' in g:
g._fallback_next = 0
g._fallback_next += 1
next_ep = current + g._fallback_next
if next_ep < len(matching):
return current_app.view_functions[matching[next_ep]](*args, **kwargs)
else:
raise NoFallbackException
def websub_url_hmac(key, feed_id, timestamp, nonce):
""" generate sha1 hmac, as required by websub/pubsubhubbub """
sig_input = f"{feed_id}:{timestamp}:{nonce}".encode('ascii')
return hmac.new(key.encode('ascii'), sig_input, hashlib.sha1).hexdigest()
def websub_body_hmac(key, body):
return hmac.new(key.encode('ascii'), body, hashlib.sha1).hexdigest()
def pp(*args):
from pprint import pprint
import sys, codecs
pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))