]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
new searchresult types, 'no more results' variant
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4 import re
5
6 def findall(obj, key):
7 """
8 given a list of dicts, where one dict contains a given key, return said key.
9 """
10 if obj is None: return []
11 return [ obj[key] for obj in obj if key in obj.keys() ]
12 def listget(obj, index, fallback=None):
13 if obj is None: return fallback
14 return next(iter(obj[index:]), fallback)
15 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
16 first = lambda l: next(iter(l),{})
17 listfind = lambda obj,key: first(findall(obj,key))
18
19 def prepare_searchresults(yt_results):
20 contents = listfind(yt_results, 'response') \
21 .get('contents',{})\
22 .get('twoColumnSearchResultsRenderer',{})\
23 .get('primaryContents',{})\
24 .get('sectionListRenderer',{})\
25 .get('contents',[])
26 contents = flatten([c.get('contents',[]) for c in findall(contents, 'itemSectionRenderer')])
27
28 return parse_result_items(contents)
29
30 def prepare_infocards(metadata):
31 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
32 return list(filter(None, map(parse_infocard, cards)))
33
34 def prepare_endcards(metadata):
35 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
36 return list(filter(None, map(parse_endcard, endsc)))
37
38 def prepare_channel(result, channel_id):
39 response = listfind(result,'response')
40
41 if 'alerts' in response: # possibly got an error back
42 from flask import current_app
43 current_app.logger.error([(alert['alertRenderer']['type'],alert['alertRenderer']['text']['simpleText']) for alert in response['alerts']])
44 return None,None,[],[],False
45
46 meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{})
47 meta2 = response.get('microformat',{}).get('microformatDataRenderer',{})
48 title = meta1.get('title', meta2.get('title'))
49 descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars
50 thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
51
52 contents = response.get('continuationContents')
53 if not contents: # overran end of list
54 return title, descr, thumb, [], False
55
56 unparsed = contents.get('gridContinuation',{}).get('items') or \
57 contents.get('sectionListContinuation',{}).get('contents') or []
58 items, extra = parse_channel_items(unparsed, channel_id, title)
59 has_more = 'continuations' in (contents.get('gridContinuation') or
60 contents.get('sectionListContinuation') or {})
61
62 return title, descr, thumb, items, has_more
63
64 def prepare_playlist(result):
65 contents = listfind(result,'response')['continuationContents']
66 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
67 has_more = 'continuations' in contents.get('playlistVideoListContinuation')
68
69 return list(filter(None, map(parse_playlist, unparsed))), has_more
70
71 def mkthumbs(thumbs):
72 output = {str(e['height']): e['url'] for e in thumbs}
73 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
74 return {**output, 'largest': largest}
75
76 def clean_url(url):
77 # externals URLs are redirected through youtube.com/redirect, but we
78 # may encounter internal URLs, too
79 return parse_qs(urlparse(url).query).get('q',[url])[0]
80
81 def toInt(s, fallback=0):
82 if s is None:
83 return fallback
84 try:
85 return int(''.join(filter(str.isdigit, s)))
86 except ValueError:
87 return fallback
88
89 # Remove left-/rightmost word from string:
90 delL = lambda s: s.partition(' ')[2]
91
92 def age(s):
93 if s is None: # missing from autogen'd music, some livestreams
94 return None
95 # Some livestreams have "Streamed 7 hours ago"
96 s = s.replace("Streamed ","")
97 # Now, everything should be in the form "1 year ago"
98 value, unit, _ = s.split(" ")
99 suffix = dict(
100 month='mn',
101 months='mn',
102 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
103
104 return f"{value}{suffix}"
105
106 def log_unknown_card(data):
107 import json
108 try:
109 from flask import request
110 source = request.url
111 except: source = "unknown"
112 with open("/tmp/innertube.err", "a") as f:
113 f.write(f"\n/***** {source} *****/\n")
114 json.dump(data, f, indent=2)
115
116 def parse_result_items(items):
117 # TODO: use .get() for most non-essential attributes
118 """
119 parses youtube search response into an easier to use format.
120 """
121 results = []
122 extras = []
123 for item in items:
124 key = next(iter(item.keys()), None)
125 content = item[key]
126 if key == 'videoRenderer':
127 is_live = listfind(content.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
128 results.append({'type': 'VIDEO', 'content': {
129 'video_id': content['videoId'],
130 'title': content['title']['runs'][0]['text'],
131 'author': content['longBylineText']['runs'][0]['text'] or \
132 content['shortBylineText']['runs'][0]['text'],
133 'channel_id': content['ownerText']['runs'][0] \
134 ['navigationEndpoint']['browseEndpoint']['browseId'],
135 'length': content.get('lengthText',{}).get('simpleText') \
136 if not is_live else 'LIVE', # "44:07", "1:41:50"
137 'views': toInt(content.get('viewCountText',{}).get('simpleText') or # "123,456 views", ...
138 listget(content.get('viewCountText',{}).get('runs',[]),0,{}).get('text')) or 0, # ... "1,234 watching", absent on 0 views
139 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
140 }})
141 elif key == 'playlistRenderer':
142 results.append({'type': 'PLAYLIST', 'content': {
143 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or \
144 content.get('playlistId'), # COURSE/"learning playlist"
145 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId') or \
146 videoid_from_thumbnail(content), # learning playlist
147 'title': content['title']['simpleText'],
148 # Note: learning playlists have no author/channel_id
149 'author': listget(content.get('longBylineText',{}).get('runs',[]),0,{}).get('text') or
150 listget(content.get('shortBylineText',{}).get('runs',[]),0,{}).get('text'),
151 'channel_id': listget(content.get('longBylineText',{}).get('runs',[]),0,{}) \
152 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'), # OR .shortBylineText
153 'n_videos': toInt(content['videoCount']),
154 }})
155 elif key == 'radioRenderer': # "Mix" playlists
156 results.append({'type': 'PLAYLIST', 'content': {
157 'playlist_id': content['playlistId'],
158 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
159 'title': content['title']['simpleText'],
160 'author': content['longBylineText']['simpleText'] or \
161 content['shortBylineText']['simpleText'] , # always "YouTube"
162 'channel_id': None,
163 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \
164 content['videoCountText']['runs'][0]['text'],
165 # videoCountShortText: "50+"; videoCountText: "50+ videos"
166 }})
167 elif key == 'showRenderer': # normal playlist, specially displayed
168 results.append({'type': 'PLAYLIST', 'content': {
169 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
170 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
171 'title': content['title']['simpleText'],
172 'author': content['longBylineText']['runs'][0]['text'] or \
173 content['shortBylineText']['runs'][0]['text'],
174 'channel_id': None,
175 'n_videos': None,
176 }})
177 elif key == 'channelRenderer':
178 results.append({'type': 'CHANNEL', 'content': {
179 'channel_id': content['channelId'],
180 'title': content['title']['simpleText'],
181 'icons': mkthumbs(content['thumbnail']['thumbnails']),
182 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
183 }})
184 elif key == 'shelfRenderer':
185 subkey = next(iter(content['content'].keys()), {}) #verticalListRenderer/horizontalMovieListRenderer
186 r, e = parse_result_items(content['content'][subkey]['items'])
187 results.extend(r)
188 extras.extend(e)
189 elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent
190 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
191 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
192 pass
193 elif key == 'horizontalCardListRenderer':
194 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
195 pass
196 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
197 pass
198 elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos
199 pass
200 elif key == 'webAnswerRenderer': # "Result from the web"
201 pass
202 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
203 extras.append({
204 'type': 'spelling',
205 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
206 'autocorrected': key == 'showingResultsForRenderer',
207 })
208 elif key == 'messageRenderer': # "No more results"
209 extras.append({
210 'type': 'message',
211 'message': content.get('title',{}).get('runs',[{}])[0].get('text') or \
212 content.get('text',{}).get('runs',[{}])[0].get('text'),
213 })
214 elif key == 'backgroundPromoRenderer': # e.g. "no results"
215 extras.append({
216 'type': content['icon']['iconType'],
217 'message': content['title']['runs'][0]['text'],
218 })
219 else:
220 log_unknown_card(item)
221 return results, extras
222
223 def parse_infocard(card):
224 """
225 parses a single infocard into a format that's easier to handle.
226 """
227 card = card['cardRenderer']
228 ctype = list(card['content'].keys())[0]
229 content = card['content'][ctype]
230 if ctype == "pollRenderer":
231 return {'type': "POLL", 'content': {
232 'question': content['question']['simpleText'],
233 'answers': [(a['text']['simpleText'],a['numVotes']) \
234 for a in content['choices']],
235 }}
236 elif ctype == "videoInfoCardContentRenderer":
237 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
238 return {'type': "VIDEO", 'content': {
239 'video_id': content['action']['watchEndpoint']['videoId'],
240 'title': content['videoTitle']['simpleText'],
241 'author': delL(content['channelName']['simpleText']),
242 'length': content.get('lengthString',{}).get('simpleText') \
243 if not is_live else "LIVE", # "23:03"
244 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
245 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
246 }}
247 elif ctype == "playlistInfoCardContentRenderer":
248 return {'type': "PLAYLIST", 'content': {
249 'playlist_id': content['action']['watchEndpoint']['playlistId'],
250 'video_id': content['action']['watchEndpoint']['videoId'],
251 'title': content['playlistTitle']['simpleText'],
252 'author': delL(content['channelName']['simpleText']),
253 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
254 }}
255 elif ctype == "simpleCardContentRenderer" and \
256 'urlEndpoint' in content['command']:
257 return {'type': "WEBSITE", 'content': {
258 'url': clean_url(content['command']['urlEndpoint']['url']),
259 'domain': content['displayDomain']['simpleText'],
260 'title': content['title']['simpleText'],
261 # XXX: no thumbnails for infocards
262 }}
263 elif ctype == "collaboratorInfoCardContentRenderer":
264 return {'type': "CHANNEL", 'content': {
265 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
266 'title': content['channelName']['simpleText'],
267 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
268 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
269 }}
270 else:
271 log_unknown_card(card)
272 return None
273
274 def parse_endcard(card):
275 """
276 parses a single endcard into a format that's easier to handle.
277 """
278 card = card.get('endscreenElementRenderer', card) #only sometimes nested
279 ctype = card['style']
280 if ctype == "CHANNEL":
281 return {'type': ctype, 'content': {
282 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
283 'title': card['title']['simpleText'],
284 'icons': mkthumbs(card['image']['thumbnails']),
285 }}
286 elif ctype == "VIDEO":
287 return {'type': ctype, 'content': {
288 'video_id': card['endpoint']['watchEndpoint']['videoId'],
289 'title': card['title']['simpleText'],
290 'length': card['videoDuration']['simpleText'], # '12:21'
291 'views': toInt(card['metadata']['simpleText']),
292 # XXX: no channel name
293 }}
294 elif ctype == "PLAYLIST":
295 return {'type': ctype, 'content': {
296 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
297 'video_id': card['endpoint']['watchEndpoint']['videoId'],
298 'title': card['title']['simpleText'],
299 'author': delL(card['metadata']['simpleText']),
300 'n_videos': toInt(card['playlistLength']['simpleText']),
301 }}
302 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
303 url = clean_url(card['endpoint']['urlEndpoint']['url'])
304 return {'type': "WEBSITE", 'content': {
305 'url': url,
306 'domain': urlparse(url).netloc,
307 'title': card['title']['simpleText'],
308 'icons': mkthumbs(card['image']['thumbnails']),
309 }}
310 else:
311 log_unknown_card(card)
312 return None
313
314 def videoid_from_thumbnail(content):
315 # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists)
316 return re.match(r"https?://i.ytimg.com/vi/([-_0-9a-zA-Z]{11})|()",
317 listget(listget(content.get('thumbnails',[]),0,{}).get('thumbnails',[]),0,{}).get('url','')
318 ).group(1)
319
320 def parse_channel_items(items, channel_id, author):
321 result = []
322 extra = []
323 for item in items:
324 key = next(iter(item.keys()), None)
325 content = item[key]
326 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
327 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
328 result.append({'type': 'VIDEO', 'content': {
329 'video_id': content['videoId'],
330 'title': content['title'].get('simpleText') or content['title'].get('runs',[{}])[0].get('text'),
331 'author': content.get('bylineText',{}).get('runs',[{}])[0].get('text') or author,
332 'channel_id': content.get('bylineText',{}).get('runs',[{}])[0] \
333 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id,
334 'length': (content.get('lengthText',{}).get('simpleText') or # topic channel
335 listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer')
336 .get('text',{}).get('simpleText')),
337 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
338 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
339 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
340 }})
341 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
342 result.append({'type': 'PLAYLIST', 'content': {
343 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or content.get('playlistId'),
344 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId',{}) or videoid_from_thumbnail(content),
345 'title': (content['title'].get('simpleText') or # playlistRenderer
346 content['title']['runs'][0]['text']), # gridPlaylistRenderer
347 'author': author,
348 'channel_id': channel_id,
349 'n_videos': toInt(content.get('videoCount') or # playlistRenderer
350 content.get('videoCountShortText',{}).get('simpleText') or # grid(1)
351 content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
352 }})
353 elif key == "showRenderer":
354 result.append({'type': 'PLAYLIST', 'content': {
355 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
356 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
357 'title': content['title']['simpleText'],
358 'author': author,
359 'channel_id': channel_id,
360 'n_videos': None,
361 }})
362 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer"]:
363 newkey = {
364 "itemSectionRenderer": 'contents',
365 "gridRenderer": 'items',
366 "horizontalCardListRenderer": 'cards',
367 }.get(key)
368 r, e = parse_channel_items(content[newkey], channel_id, author)
369 result.extend(r)
370 extra.extend(e)
371 elif key == "shelfRenderer":
372 r, e = parse_channel_items([content['content']], channel_id, author)
373 result.extend(r)
374 extra.extend(e)
375 elif key == "messageRenderer":
376 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
377 pass
378 elif key == "gameCardRenderer":
379 pass
380 else:
381 log_unknown_card(item)
382
383 return result, extra
384
385 def parse_playlist(item):
386 key = next(iter(item.keys()), None)
387 content = item[key]
388 if key == "playlistVideoRenderer":
389 if not content.get('isPlayable', False):
390 return None # private or deleted video
391
392 return {'type': 'VIDEO', 'content': {
393 'video_id': content['videoId'],
394 'title': (content['title'].get('simpleText') or # playable videos
395 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
396 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
397 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
398 # rest is missing from unplayable videos:
399 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
400 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
401 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
402 int(content.get("lengthSeconds", 0))), # "531"
403 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
404 }}
405 else:
406 raise Exception(item) # XXX TODO
Imprint / Impressum