# functions that deal with parsing data from youtube's internal API ("innertube") from ..common.common import mkthumbs, log_unknown_card, G class Select: """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """ def __init__(self, key=None, *, all=None): self.key = key or all self.all = all def __ror__(self, other): try: items = [ other[self.key] for other in other if self.key in other.keys() ] except: items = [] return items if self.all else items|G(0) class A: """ apply """ def __init__(self, f, *args): self.f = f self.args = args def __ror__(self, other): return self.f(other, *self.args) class _Int: def __ror__(self, other): try: return int(''.join(filter(str.isdigit, other))) except: return None int = _Int() def prepare_searchresults(yt_results): contents = ( # from continuation token yt_results |G('onResponseReceivedCommands') |Select('appendContinuationItemsAction') |G('continuationItems') ) or ( # from page 1 yt_results |G('contents') |G('twoColumnSearchResultsRenderer') |G('primaryContents') |G('sectionListRenderer') |G('contents') ) items = contents|Select('itemSectionRenderer')|G('contents') items, extra = parse_result_items(items) more = contents|Select("continuationItemRenderer")|G("continuationEndpoint")|G("continuationCommand")|G("token") estimatedResults = yt_results|G("estimatedResults") return items, extra, more def prepare_channel(response, channel_id, channel_name): meta1 = response|G('metadata')|G('channelMetadataRenderer') meta2 = response|G('microformat')|G('microformatDataRenderer') title = meta1|G('title') or meta2|G('title') or channel_name descr = meta1|G('description') or meta2|G('description') # meta2.description is capped at 160chars thumb = mkthumbs((meta2|G('thumbnail') or meta1|G('avatar'))|G('thumbnails') or {}) # .avatar ~ 900px contents = ( response|G('continuationContents') or response|G('onResponseReceivedActions') ) if not contents: # overran end of list return title, descr, thumb, [], False unparsed = contents|G('gridContinuation')|G('items') or \ contents|G('sectionListContinuation')|G('contents') or \ contents|G('richGridContinuation')|G('contents') or \ contents|Select('appendContinuationItemsAction')|G('continuationItems') or \ contents|G(-1)|G('reloadContinuationItemsCommand')|G('continuationItems') or [] items, extra = parse_channel_items(unparsed, channel_id, title) more = ( # videos, livestreams unparsed |Select('continuationItemRenderer') |G('continuationEndpoint') |G('continuationCommand') |G('token') ) or ( # playlists, search contents |G('gridContinuation', 'sectionListContinuation') |G('continuations') |Select('nextContinuationData') |G('continuation') ) return title, descr, thumb, items, more def prepare_playlist(result): contents = result['continuationContents'] unparsed = contents['playlistVideoListContinuation'].get('contents',[]) more = ( contents |G('playlistVideoListContinuation') |G('continuations') |Select('nextContinuationData') |G('continuation') ) meta = result|G('sidebar')|G('playlistSidebarRenderer')|G('items') meta1 = meta|Select('playlistSidebarPrimaryInfoRenderer') meta2 = meta|Select('playlistSidebarSecondaryInfoRenderer') \ |G('videoOwner')|G('videoOwnerRenderer') title = meta1|G('title')|G.text author = meta2|G('title')|G.text channel_id = meta2|G('navigationEndpoint')|G('browseEndpoint')|G('browseId') return title, author, channel_id, list(filter(None, map(parse_playlist, unparsed))), more def age(s): if s is None: # missing from autogen'd music, some livestreams return None # Some livestreams have "Streamed 7 hours ago" s = s.replace("Streamed ","") # Now, everything should be in the form "1 year ago" value, unit, _ = s.split(" ") suffix = dict( minute='min', minutes='min', ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y) return f"{value}{suffix}" def parse_result_items(items): # TODO: use .get() for most non-essential attributes """ parses youtube search response into an easier to use format. """ results = [] extras = [] for item in items: key = next(iter(item.keys()), None) content = item[key] if key in ['videoRenderer', 'reelItemRenderer', 'gridVideoRenderer']: results.append({'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': content|G('title')|G.text or content|G('headline')|G.text, 'author': content|G('longBylineText','shortBylineText')|G.text, 'channel_id': content|G('ownerText')|G('runs')|G(0) \ |G('navigationEndpoint')|G('browseEndpoint')|G('browseId') \ or content|G("channelThumbnailSupportedRenderers")| \ G("channelThumbnailWithLinkRenderer")|G("navigationEndpoint")| \ G("browseEndpoint")|G("browseId"), 'length': content|G('lengthText')|G.text, # "44:07", "1:41:50" 'views': content|G('viewCountText')|G.text|A.int or 0, # "1,234 {views|watching}", absent on 0 views 'published': content|G('publishedTimeText')|G('simpleText')|A(age), 'live': content|G('badges')|Select('metadataBadgeRenderer')|G('style')=='BADGE_STYLE_TYPE_LIVE_NOW', }}) elif key in ['playlistRenderer', 'radioRenderer', 'showRenderer']: # radio == "Mix" playlist, show == normal playlist, specially displayed results.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['navigationEndpoint']|G('watchEndpoint')|G('playlistId'), 'video_id': content['navigationEndpoint']|G('watchEndpoint')|G('videoId'), 'title': content['title']|G.text, 'author': content|G('longBylineText','shortBylineText')|G.text, 'channel_id': content|G('longBylineText','shortBylineText')|G('runs')|G(0) \ |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'), 'n_videos': content|G('videoCount')|A.int or \ content|G('videoCountShortText','videoCountText')|G.text, # "Mix" playlists }}) elif key == 'channelRenderer': results.append({'type': 'CHANNEL', 'content': { 'channel_id': content['channelId'], 'title': content['title']|G.text, 'icons': content['thumbnail']['thumbnails']|A(mkthumbs), 'subscribers': content|G('subscriberCountText')|G('simpleText'), # "2.47K subscribers" }}) elif key == 'shelfRenderer': subkey = next(iter(content['content'].keys()), None) #verticalListRenderer/horizontalMovieListRenderer r, e = parse_result_items(content['content'][subkey]['items']) results.extend(r) extras.extend(e) elif key in ["reelShelfRenderer"]: r, e = parse_result_items(content['items']) results.extend(r) extras.extend(e) elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent pass # gMR.{videoId,title.runs[].text,lengthText.simpleText} elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer', 'promotedSparklesWebRenderer','compactPromotedItemRenderer', 'adSlotRenderer']: # haha, no. pass elif key == 'horizontalCardListRenderer': # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text pass elif key == 'emergencyOneboxRenderer': # suicide prevention hotline pass elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos pass elif key == 'webAnswerRenderer': # "Result from the web" pass elif key == 'infoPanelContentRenderer': # "These results may be new or changing quickly" pass elif key == 'hashtagTileRenderer': # link to '/hashtag/' pass elif key in ['didYouMeanRenderer', 'showingResultsForRenderer', 'includingResultsForRenderer']: extras.append({ 'type': 'spelling', 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query 'autocorrected': key in ['showingResultsForRenderer', 'includingResultsForRenderer'], }) elif key == 'messageRenderer': # "No more results" extras.append({ 'type': 'message', 'message': content|G('title','text')|G.text, }) elif key == 'backgroundPromoRenderer': # e.g. "no results" extras.append({ 'type': content['icon']['iconType'], 'message': content['title']|G.text, }) else: log_unknown_card(item) return results, extras def parse_channel_items(items, channel_id, author): result = [] extra = [] for item in items: key = next(iter(item.keys()), None) content = item[key] if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer", 'reelItemRenderer']: # reel==youtube-shorts # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones. result.append({'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': content|G('title')|G.text or content|G('headline')|G.text, 'author': content|G('bylineText')|G.text or author, 'channel_id': (content|G('bylineText')|G('runs') |Select('navigationEndpoint') |G('browseEndpoint')|G('browseId') or channel_id), 'length': (content|G('lengthText')|G.text or # topic channel content|G('thumbnailOverlays') |Select('thumbnailOverlayTimeStatusRenderer') |G('text')|G.text), # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago" 'views': content|G('viewCountText')|G.text|A.int, 'published': content|G('publishedTimeText')|G.text|A(age), }}) elif key in ["gridPlaylistRenderer", "playlistRenderer", "gridRadioRenderer"]: result.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content|G('navigationEndpoint')|G('watchEndpoint')|G('playlistId'), 'video_id': content|G('navigationEndpoint')|G('watchEndpoint')|G('videoId'), 'title': content|G('title')|G.text, 'author': author, # Note: gridRadioRenderer is by 'Youtube' without channel_id, ignoring that. 'channel_id': channel_id, 'n_videos': (content|G('videoCount')|A.int or # playlistRenderer content|G('videoCountShortText','videoCountText')|G.text|A.int) # grid }}) elif key == "showRenderer": result.append({'type': 'PLAYLIST', 'content': { 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'], 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'], 'title': content['title']['simpleText'], 'author': author, 'channel_id': channel_id, 'n_videos': None, }}) elif key in ["gridShowRenderer"]: result.append({'type': 'PLAYLIST', 'content': { 'playlist_id': (content|G('navigationEndpoint') |G('browseEndpoint')|G('browseId'))[2:], #^: playlistId prefixed with 'VL', which must be removed 'video_id': None, 'title': content|G('title')|G.text, 'author': author, 'channel_id': channel_id, 'n_videos': content|G('thumbnailOverlays')|G(0) |G('thumbnailOverlayBottomPanelRenderer')|G('text')|G.text, }}) elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer", "horizontalListRenderer"]: newkey = { "itemSectionRenderer": 'contents', "gridRenderer": 'items', "horizontalCardListRenderer": 'cards', "horizontalListRenderer": 'items', }.get(key) r, e = parse_channel_items(content[newkey], channel_id, author) result.extend(r) extra.extend(e) elif key in ["shelfRenderer", "richItemRenderer"]: r, e = parse_channel_items([content['content']], channel_id, author) result.extend(r) extra.extend(e) elif key in ["reelShelfRenderer"]: r, e = parse_channel_items(content['items'], channel_id, author) result.extend(r) extra.extend(e) elif key == "messageRenderer": # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}} pass elif key == "gameCardRenderer": pass elif key == "gridChannelRenderer": pass # don't care; related channels, e.g. on UCMsgXPD3wzzt8RxHJmXH7hQ elif key == 'continuationItemRenderer': # handled in parent function pass else: log_unknown_card(item) return result, extra def parse_playlist(item): key = next(iter(item.keys()), None) content = item[key] if key == "playlistVideoRenderer": if not content.get('isPlayable', False): return None # private or deleted video return {'type': 'VIDEO', 'content': { 'video_id': content['videoId'], 'title': (content['title'].get('simpleText') or # playable videos content['title'].get('runs',[{}])[0].get('text')), # "[Private video]" 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'], 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5) # rest is missing from unplayable videos: 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'), 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'), 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51" int(content.get("lengthSeconds", 0))), # "531" 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'), }} else: raise Exception(item) # XXX TODO