import os import re import json import requests import dateutil.parser from datetime import datetime, timezone from xml.etree import ElementTree from urllib.parse import parse_qs from configparser import ConfigParser cf = ConfigParser() config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini') cf.read(config_filename) def fetch_xml(feed_type, feed_id): r = requests.get(f"https://www.youtube.com/feeds/videos.xml?{feed_type}={feed_id}") if not r.ok: return None return r.text def parse_xml(xmldata): ns = { 'atom':"http://www.w3.org/2005/Atom", 'yt': "http://www.youtube.com/xml/schemas/2015", 'media':"http://search.yahoo.com/mrss/" } feed = ElementTree.fromstring(xmldata) author = feed.find('atom:author',ns).find('atom:name',ns).text if feed.find('atom:author',ns) else None if feed.find('yt:channelId',ns): channel_id = feed.find('yt:channelId',ns).text else: # TODO: clean this up (websub has no yt:channelId, this should be adapted for playlists) self = feed.find('atom:link[@rel="self"]',ns).get('href') channel_id = parse_qs(self.split('?')[1]).get('channel_id')[0] title = feed.find('atom:title',ns).text videos = [] for entry in feed.findall('atom:entry',ns): videos.append({ 'video_id': entry.find('yt:videoId',ns).text, 'title': entry.find('atom:title',ns).text, 'published': entry.find('atom:published',ns).text, 'channel_id': entry.find('yt:channelId',ns).text, 'author': entry.find('atom:author',ns).find('atom:name',ns).text, # extra fields for pull_subs/webhook: 'updated': entry.find('atom:updated',ns).text, #'description': entry.find('media:group',ns).find('media:description',ns).text ##xxx:missing for websub }) return title, author, channel_id, videos def update_channel(db, xmldata): """ returns True on success, False on failure. rigorous error checking is required, otherwise data will be lost! the caller MUST (as per RFC 2119) write (append) the xmlfeed into a file on error. """ if not xmldata: return False # Note: wbesub does not return global author title, author, channel_id, videos = parse_xml(xmldata) #xxx: perl-code had this eval'd for a die c = db.cursor() for video in videos: now = datetime.now(timezone.utc) updated = dateutil.parser.parse(video['updated']) published = dateutil.parser.parse(video['updated']) # if update and published time are near-identical, it's new. use crawl time if it was published within a week. # else, it's just an update to an older video (before we subscribed, so use original upload time). if (updated - published).seconds < 60 and (now - published).days < 7: timestamp = now else: timestamp = published c.execute(""" INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled) VALUES (?, ?, ?, datetime(?), datetime(?)) """, (video['video_id'], video['channel_id'], video['title'], video['published'], timestamp)) #XXX:errorcheck # update channel name (we don't fetch it on subscribing) author = video['author'] # XXX: doing this once per channel is enough (for pull-subs.pl) c.execute(""" INSERT OR REPLACE INTO channels (id, name) VALUES (?, ?) """, (channel_id, author)) #XXX:errorcheck return True def get_video_info(video_id, sts=0, algo=""): """ returns the best-quality muxed video stream, the player_response, error-type/-mesage error types: 'initial': the request to get_video_info was malformed 'player': playabilityStatus != OK 'internal': [livestream, geolocked, exhausted] """ # TODO: caching, e.g. beaker? need to not cache premiering-soon videos/livestreams/etc, though # responses are apparently valid for 6h; maybe cache for (video_length - 2h) # TODO: errro types? ["invalid parameters", playabilitystatus, own] # todo: a bit messy; should return all unscrambled video urls in best->worst quality # we try to fetch the video multiple times using different origins for el in ['embedded', 'detailpage']: # ['el-completely-absent',info,leanback,editpage,adunit,previewpage,profilepage] r = requests.get(f"https://www.youtube.com/get_video_info"+ f"?video_id={video_id}"+ f"&eurl=https://youtube.googleapis.com/v/{video_id}"+ f"&el={el}"+ f"&sts={sts}"+ f"&hl=en_US") #"&hl=en&gl=US" params = parse_qs(r.text) if 'errorcode' in params: # status=fail return None, None, 'initial', f"MALFORMED: {params['reason'][0]}" metadata = json.loads(params.get('player_response')[0]) if metadata['playabilityStatus']['status'] != "OK": if metadata['playabilityStatus']['status'] == "UNPLAYABLE": continue # try again with different 'el' value. if none succeeds, we fall into "exhausted" path, which returns last tried metadata, from which the playabilityStatus.reason can be extracted. according to jwz/youtubedown, the worst error message comes from embedded, which is tried first, so it should be overwritten by a better message. return None, None, 'player', f"{metadata['playabilityStatus']['status']}: {metadata['playabilityStatus']['reason']}" if 'liveStreamability' in metadata['playabilityStatus']: return None, metadata, 'internal', "livestream" # can also check .microformat.liveBroadcastDetails.isLiveNow formats = metadata['streamingData']['formats'] for (i,v) in enumerate(formats): if not ('cipher' in v or 'signatureCipher' in v): continue cipher = parse_qs(v.get('cipher') or v.get('signatureCipher')) formats[i]['url'] = unscramble(cipher, algo) # todo: check if we have urls or try again url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url'] if 'gcr' in parse_qs(url): return None, metadata, 'internal', "geolocked" return url, metadata, None, None else: return None, metadata, 'internal', "exhausted" def unscramble(cipher, algo): # test video id: UxxajLWwzqY signature = list(cipher['s'][0]) for c in algo.split(): op, ix = re.match(r"([rsw])(\d+)?", c).groups() if not op: continue if op == 'r': signature = list(reversed(signature)) if op == 's': signature = signature[int(ix):] if op == 'w': signature[0], signature[int(ix)%len(signature)] = signature[int(ix)%len(signature)], signature[0] sp = cipher.get('sp', ['signature'])[0] sig = cipher['sig'][0] if 'sig' in cipher else ''.join(signature) return f"{cipher['url'][0]}&{sp}={sig}" def prepare_metadata(metadata): meta1 = metadata['videoDetails'] meta2 = metadata['microformat']['playerMicroformatRenderer'] cards = metadata['cards']['cardCollectionRenderer']['cards'] if 'cards' in metadata else [] endsc = metadata['endscreen']['endscreenRenderer']['elements'] if 'endscreen' in metadata else [] #aspect_ratio = meta2['embed']['width'] / meta2['embed']['height'], # sometimes absent aspect_ratio = meta2['thumbnail']['thumbnails'][0]['width'] / meta2['thumbnail']['thumbnails'][0]['height'] subtitles = sorted([ {'url':cc['baseUrl'], 'code':cc['languageCode'], 'autogenerated':cc.get('kind')=="asr", 'name':cc['name']['simpleText']} for cc in metadata['captions']['playerCaptionsTracklistRenderer']['captionTracks'] ], key=lambda cc: cc['autogenerated']) if 'captionTracks' in metadata['captions']['playerCaptionsTracklistRenderer'] else [] def parse_infocard(card): card = card['cardRenderer'] teaser = card['teaser']['simpleCardTeaserRenderer']['message']['simpleText'] # not used ctype = list(card['content'].keys())[0] content = card['content'][ctype] if ctype == "pollRenderer": ctype = "POLL" content = { 'question': content['question']['simpleText'], 'answers': [(a['text']['simpleText'],a['numVotes']) for a in content['choices']], } elif ctype == "videoInfoCardContentRenderer": ctype = "VIDEO" content = { 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['videoTitle']['simpleText'], 'author': content['channelName']['simpleText'], # 'by xXxXx' 'length': content['lengthString']['simpleText'], # '23:03' 'views': content['viewCountText']['simpleText'], # '421,248 views' } elif ctype == "playlistInfoCardContentRenderer": ctype = "PLAYLIST" content = { 'playlist_id': content['action']['watchEndpoint']['playlistId'], 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['playlistTitle']['simpleText'], 'author': content['channelName']['simpleText'], 'n_videos': content['playlistVideoCount']['simpleText'], # '21' } elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content.get('command',{}).keys(): ctype = "WEBSITE" content = { 'url': parse_qs(content['command']['urlEndpoint']['url'].split('?')[1])['q'][0], 'domain': content['displayDomain']['simpleText'], 'title': content['title']['simpleText'], 'text': content['actionButton']['simpleCardButtonRenderer']['text']['simpleText'], } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'teaser': teaser, 'type': ctype, 'content': content} def parse_endcard(card): card = card['endscreenElementRenderer'] if 'endscreenElementRenderer' in card.keys() else card ctype = card['style'] if ctype == "CHANNEL": content = { 'channel_id': card['endpoint']['browseEndpoint']['browseId'], 'title': card['title']['simpleText'], 'icons': {e['height']: e['url'] for e in card['image']['thumbnails']}, } elif ctype == "VIDEO": content = { 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'length': card['videoDuration']['simpleText'], # '12:21' 'views': card['metadata']['simpleText'], # '51,649 views' } elif ctype == "PLAYLIST": content = { 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'author': card['metadata']['simpleText'], 'n_videos': card['playlistLength']['simpleText'].replace(" videos", ""), } elif ctype == "WEBSITE": content = { 'url': parse_qs(card['endpoint']['urlEndpoint']['url'].split('?')[1])['q'][0], 'domain': card['metadata']['simpleText'], 'title': card['title']['simpleText'], 'icons': {e['height']: e['url'] for e in card['image']['thumbnails']}, } else: import pprint content = {'error': f"{ctype} is not implemented;
{pprint.pformat(card)}
"} return {'type': ctype, 'content': content} return { 'title': meta1['title'], 'author': meta1['author'], 'channel_id': meta1['channelId'], 'description': meta1['shortDescription'], 'published': meta2['publishDate'], 'views': meta1['viewCount'], 'length': int(meta1['lengthSeconds']), 'rating': meta1['averageRating'], 'category': meta2['category'], 'aspectr': aspect_ratio, 'unlisted': meta2['isUnlisted'], 'countries': meta2['availableCountries'], 'poster': meta2['thumbnail']['thumbnails'][0]['url'], 'infocards': [parse_infocard(card) for card in cards], 'endcards': [parse_endcard(card) for card in endsc], 'subtitles': subtitles, } def pp(*args): from pprint import pprint import sys, codecs pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))