# functions that deal with parsing data from youtube's internal API ("innertube") from urllib.parse import parse_qs, urlparse import re def findall(obj, key): """ given a list of dicts, where one dict contains a given key, return said key. """ if obj is None: return [] return [ obj[key] for obj in obj if key in obj.keys() ] def listget(obj, index, fallback=None): if obj is None: return fallback return next(iter(obj[index:]), fallback) flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952 first = lambda l: next(iter(l),{}) listfind = lambda obj,key: first(findall(obj,key)) def prepare_searchresults(yt_results): contents = listfind(yt_results, 'response') \ .get('contents',{})\ .get('twoColumnSearchResultsRenderer',{})\ .get('primaryContents',{})\ .get('sectionListRenderer',{})\ .get('contents',[]) contents = flatten([c.get('contents',[]) for c in findall(contents, 'itemSectionRenderer')]) return parse_result_items(contents) def prepare_infocards(metadata): cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[]) return list(filter(None, map(parse_infocard, cards))) def prepare_endcards(metadata): endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[]) return list(filter(None, map(parse_endcard, endsc))) def prepare_channel(result, channel_id): response = listfind(result,'response') if 'alerts' in response: # possibly got an error back from flask import current_app current_app.logger.error([(alert['alertRenderer']['type'],alert['alertRenderer']['text']['simpleText']) for alert in response['alerts']]) return None,None,[],[],False meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{}) meta2 = response.get('microformat',{}).get('microformatDataRenderer',{}) title = meta1.get('title', meta2.get('title')) descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px contents = response.get('continuationContents') if not contents: # overran end of list return title, descr, thumb, [], False unparsed = contents.get('gridContinuation',{}).get('items') or \ contents.get('sectionListContinuation',{}).get('contents') or [] items, extra = parse_channel_items(unparsed, channel_id, title) has_more = 'continuations' in (contents.get('gridContinuation') or contents.get('sectionListContinuation') or {}) return title, descr, thumb, items, has_more def prepare_playlist(result): contents = listfind(result,'response')['continuationContents'] unparsed = contents['playlistVideoListContinuation'].get('contents',[]) has_more = 'continuations' in contents.get('playlistVideoListContinuation') return list(filter(None, map(parse_playlist, unparsed))), has_more def mkthumbs(thumbs): output = {str(e['height']): e['url'] for e in thumbs} largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None) return {**output, 'largest': largest} def clean_url(url): # externals URLs are redirected through youtube.com/redirect, but we # may encounter internal URLs, too return parse_qs(urlparse(url).query).get('q',[url])[0] def toInt(s, fallback=0): if s is None: return fallback try: return int(''.join(filter(str.isdigit, s))) except ValueError: return fallback # Remove left-/rightmost word from string: delL = lambda s: s.partition(' ')[2] def age(s): if s is None: # missing from autogen'd music, some livestreams return None # Some livestreams have "Streamed 7 hours ago" s = s.replace("Streamed ","") # Now, everything should be in the form "1 year ago" value, unit, _ = s.split(" ") suffix = dict( month='mn', months='mn', ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y) return f"{value}{suffix}" def log_unknown_card(data): import json try: from flask import request source = request.url except: source = "unknown" with open("/tmp/innertube.err", "a") as f: f.write(f"\n/***** {source} *****/\n") json.dump(data, f, indent=2) def parse_result_items(items): # TODO: use .get() for most non-essential attributes """ parses youtube search response into an easier to use format. """ results = [] extras = [] for item in items: key = next(iter(item.keys()), None) content = item[key] if key == 'videoRenderer': is_live = listfind(content.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW' results.append({'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': content['title']['runs'][0]['text'], 'author': content['longBylineText']['runs'][0]['text'] or \ content['shortBylineText']['runs'][0]['text'], 'channel_id': content['ownerText']['runs'][0] \ ['navigationEndpoint']['browseEndpoint']['browseId'], 'length': content.get('lengthText',{}).get('simpleText') \ if not is_live else 'LIVE', # "44:07", "1:41:50" 'views': toInt(content.get('viewCountText',{}).get('simpleText') or # "123,456 views", ... listget(content.get('viewCountText',{}).get('runs',[]),0,{}).get('text')) or 0, # ... "1,234 watching", absent on 0 views 'published': age(content.get('publishedTimeText',{}).get('simpleText')), }}) elif key == 'playlistRenderer': results.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or \ content.get('playlistId'), # COURSE/"learning playlist" 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId') or \ videoid_from_thumbnail(content), # learning playlist 'title': content['title']['simpleText'], # Note: learning playlists have no author/channel_id 'author': listget(content.get('longBylineText',{}).get('runs',[]),0,{}).get('text') or listget(content.get('shortBylineText',{}).get('runs',[]),0,{}).get('text'), 'channel_id': listget(content.get('longBylineText',{}).get('runs',[]),0,{}) \ .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'), # OR .shortBylineText 'n_videos': toInt(content['videoCount']), }}) elif key == 'radioRenderer': # "Mix" playlists results.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['playlistId'], 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'], 'title': content['title']['simpleText'], 'author': content['longBylineText']['simpleText'] or \ content['shortBylineText']['simpleText'] , # always "YouTube" 'channel_id': None, 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \ content['videoCountText']['runs'][0]['text'], # videoCountShortText: "50+"; videoCountText: "50+ videos" }}) elif key == 'showRenderer': # normal playlist, specially displayed results.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'], 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'], 'title': content['title']['simpleText'], 'author': content['longBylineText']['runs'][0]['text'] or \ content['shortBylineText']['runs'][0]['text'], 'channel_id': None, 'n_videos': None, }}) elif key == 'channelRenderer': results.append({'type': 'CHANNEL', 'content': { 'channel_id': content['channelId'], 'title': content['title']['simpleText'], 'icons': mkthumbs(content['thumbnail']['thumbnails']), 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers" }}) elif key == 'shelfRenderer': subkey = next(iter(content['content'].keys()), {}) #verticalListRenderer/horizontalMovieListRenderer r, e = parse_result_items(content['content'][subkey]['items']) results.extend(r) extras.extend(e) elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent pass # gMR.{videoId,title.runs[].text,lengthText.simpleText} elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no. pass elif key == 'horizontalCardListRenderer': # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text pass elif key == 'emergencyOneboxRenderer': # suicide prevention hotline pass elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos pass elif key == 'webAnswerRenderer': # "Result from the web" pass elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer': extras.append({ 'type': 'spelling', 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query 'autocorrected': key == 'showingResultsForRenderer', }) elif key == 'messageRenderer': # "No more results" extras.append({ 'type': 'message', 'message': content.get('title',{}).get('runs',[{}])[0].get('text') or \ content.get('text',{}).get('runs',[{}])[0].get('text'), }) elif key == 'backgroundPromoRenderer': # e.g. "no results" extras.append({ 'type': content['icon']['iconType'], 'message': content['title']['runs'][0]['text'], }) else: log_unknown_card(item) return results, extras def parse_infocard(card): """ parses a single infocard into a format that's easier to handle. """ card = card['cardRenderer'] ctype = list(card['content'].keys())[0] content = card['content'][ctype] if ctype == "pollRenderer": return {'type': "POLL", 'content': { 'question': content['question']['simpleText'], 'answers': [(a['text']['simpleText'],a['numVotes']) \ for a in content['choices']], }} elif ctype == "videoInfoCardContentRenderer": is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None return {'type': "VIDEO", 'content': { 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['videoTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'length': content.get('lengthString',{}).get('simpleText') \ if not is_live else "LIVE", # "23:03" 'views': toInt(content.get('viewCountText',{}).get('simpleText')), # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM" }} elif ctype == "playlistInfoCardContentRenderer": return {'type': "PLAYLIST", 'content': { 'playlist_id': content['action']['watchEndpoint']['playlistId'], 'video_id': content['action']['watchEndpoint']['videoId'], 'title': content['playlistTitle']['simpleText'], 'author': delL(content['channelName']['simpleText']), 'n_videos': toInt(content['playlistVideoCount']['simpleText']), }} elif ctype == "simpleCardContentRenderer" and \ 'urlEndpoint' in content['command']: return {'type': "WEBSITE", 'content': { 'url': clean_url(content['command']['urlEndpoint']['url']), 'domain': content['displayDomain']['simpleText'], 'title': content['title']['simpleText'], # XXX: no thumbnails for infocards }} elif ctype == "collaboratorInfoCardContentRenderer": return {'type': "CHANNEL", 'content': { 'channel_id': content['endpoint']['browseEndpoint']['browseId'], 'title': content['channelName']['simpleText'], 'icons': mkthumbs(content['channelAvatar']['thumbnails']), 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers" }} else: log_unknown_card(card) return None def parse_endcard(card): """ parses a single endcard into a format that's easier to handle. """ card = card.get('endscreenElementRenderer', card) #only sometimes nested ctype = card['style'] if ctype == "CHANNEL": return {'type': ctype, 'content': { 'channel_id': card['endpoint']['browseEndpoint']['browseId'], 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), }} elif ctype == "VIDEO": return {'type': ctype, 'content': { 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'length': card['videoDuration']['simpleText'], # '12:21' 'views': toInt(card['metadata']['simpleText']), # XXX: no channel name }} elif ctype == "PLAYLIST": return {'type': ctype, 'content': { 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'], 'video_id': card['endpoint']['watchEndpoint']['videoId'], 'title': card['title']['simpleText'], 'author': delL(card['metadata']['simpleText']), 'n_videos': toInt(card['playlistLength']['simpleText']), }} elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE": url = clean_url(card['endpoint']['urlEndpoint']['url']) return {'type': "WEBSITE", 'content': { 'url': url, 'domain': urlparse(url).netloc, 'title': card['title']['simpleText'], 'icons': mkthumbs(card['image']['thumbnails']), }} else: log_unknown_card(card) return None def videoid_from_thumbnail(content): # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists) return re.match(r"https?://i.ytimg.com/vi/([-_0-9a-zA-Z]{11})|()", listget(listget(content.get('thumbnails',[]),0,{}).get('thumbnails',[]),0,{}).get('url','') ).group(1) def parse_channel_items(items, channel_id, author): result = [] extra = [] for item in items: key = next(iter(item.keys()), None) content = item[key] if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]: # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones. result.append({'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': content['title'].get('simpleText') or content['title'].get('runs',[{}])[0].get('text'), 'author': content.get('bylineText',{}).get('runs',[{}])[0].get('text') or author, 'channel_id': content.get('bylineText',{}).get('runs',[{}])[0] \ .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id, 'length': (content.get('lengthText',{}).get('simpleText') or # topic channel listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer') .get('text',{}).get('simpleText')), # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago" 'views': toInt(content.get('viewCountText',{}).get('simpleText')), 'published': age(content.get('publishedTimeText',{}).get('simpleText')), }}) elif key == "gridPlaylistRenderer" or key == "playlistRenderer": result.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or content.get('playlistId'), 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId',{}) or videoid_from_thumbnail(content), 'title': (content['title'].get('simpleText') or # playlistRenderer content['title']['runs'][0]['text']), # gridPlaylistRenderer 'author': author, 'channel_id': channel_id, 'n_videos': toInt(content.get('videoCount') or # playlistRenderer content.get('videoCountShortText',{}).get('simpleText') or # grid(1) content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2) }}) elif key == "showRenderer": result.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'], 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'], 'title': content['title']['simpleText'], 'author': author, 'channel_id': channel_id, 'n_videos': None, }}) elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer"]: newkey = { "itemSectionRenderer": 'contents', "gridRenderer": 'items', "horizontalCardListRenderer": 'cards', }.get(key) r, e = parse_channel_items(content[newkey], channel_id, author) result.extend(r) extra.extend(e) elif key == "shelfRenderer": r, e = parse_channel_items([content['content']], channel_id, author) result.extend(r) extra.extend(e) elif key == "messageRenderer": # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}} pass elif key == "gameCardRenderer": pass else: log_unknown_card(item) return result, extra def parse_playlist(item): key = next(iter(item.keys()), None) content = item[key] if key == "playlistVideoRenderer": if not content.get('isPlayable', False): return None # private or deleted video return {'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': (content['title'].get('simpleText') or # playable videos content['title'].get('runs',[{}])[0].get('text')), # "[Private video]" 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'], 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5) # rest is missing from unplayable videos: 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'), 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'), 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51" int(content.get("lengthSeconds", 0))), # "531" 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'), }} else: raise Exception(item) # XXX TODO