]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
search results: only parse first itemSectionRenderer
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4 import re
5
6 def listget(obj, index, fallback=None):
7 if obj is None: return fallback
8 return next(iter(obj[index:]), fallback)
9
10 class G:
11 """
12 null-coalescing version of dict.get() that also works on lists.
13
14 the | operator is overloaded to achieve similar looking code to jq(1) filters.
15 the first found key is used: dict(foo=1)|G('bar','foo') returns 1.
16 """
17 def __init__(self, *keys):
18 self.keys = keys
19 def __ror__(self, other):
20 for key in self.keys:
21 try: return other[key]
22 except: continue
23 return None
24 class _Text:
25 """ parses youtube's .runs[].text and .simpleText variants """
26 def __ror__(self, other): # Note: only returning runs[0], not concat'ing all!
27 return other|G('simpleText') or other|G('runs')|G(0)|G('text')
28 text = _Text()
29 class Select:
30 """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """
31 def __init__(self, key=None, *, all=None):
32 self.key = key or all
33 self.all = all
34 def __ror__(self, other):
35 try: items = [ other[self.key] for other in other if self.key in other.keys() ]
36 except: items = []
37 return items if self.all else items|G(0)
38 class A:
39 """ apply """
40 def __init__(self, f, *args):
41 self.f = f
42 self.args = args
43 def __ror__(self, other):
44 return self.f(other, *self.args)
45 class _Int:
46 def __ror__(self, other):
47 try: return int(''.join(filter(str.isdigit, other)))
48 except: return None
49 int = _Int()
50
51
52 def prepare_searchresults(yt_results):
53 contents = ( # from continuation token
54 yt_results
55 |G('onResponseReceivedCommands')
56 |Select('appendContinuationItemsAction')
57 |G('continuationItems')
58 ) or ( # from page 1
59 yt_results
60 |G('contents')
61 |G('twoColumnSearchResultsRenderer')
62 |G('primaryContents')
63 |G('sectionListRenderer')
64 |G('contents')
65 )
66 items = contents|Select('itemSectionRenderer')|G('contents')
67 items, extra = parse_result_items(items)
68 more = contents|Select("continuationItemRenderer")|G("continuationEndpoint")|G("continuationCommand")|G("token")
69 estimatedResults = yt_results|G("estimatedResults")
70
71 return items, extra, more
72
73 def prepare_infocards(metadata):
74 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
75 return list(filter(None, map(parse_infocard, cards)))
76
77 def prepare_endcards(metadata):
78 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
79 return list(filter(None, map(parse_endcard, endsc)))
80
81 def prepare_channel(response, channel_id):
82 meta1 = response|G('metadata')|G('channelMetadataRenderer')
83 meta2 = response|G('microformat')|G('microformatDataRenderer')
84 title = meta1|G('title') or meta2|G('title')
85 descr = meta1|G('description') or meta2|G('description') # meta2.description is capped at 160chars
86 thumb = mkthumbs((meta2|G('thumbnail') or meta1|G('avatar'))|G('thumbnails') or {}) # .avatar ~ 900px
87
88 contents = response|G('continuationContents')
89 if not contents: # overran end of list
90 return title, descr, thumb, [], False
91
92 unparsed = contents|G('gridContinuation')|G('items') or \
93 contents|G('sectionListContinuation')|G('contents') or []
94 items, extra = parse_channel_items(unparsed, channel_id, title)
95 more = (
96 contents
97 |G('gridContinuation', 'sectionListContinuation')
98 |G('continuations')
99 |Select('nextContinuationData')
100 |G('continuation')
101 )
102
103 return title, descr, thumb, items, more
104
105 def prepare_playlist(result):
106 contents = result['continuationContents']
107 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
108 more = (
109 contents
110 |G('playlistVideoListContinuation')
111 |G('continuations')
112 |Select('nextContinuationData')
113 |G('continuation')
114 )
115
116 return list(filter(None, map(parse_playlist, unparsed))), more
117
118 def mkthumbs(thumbs):
119 output = {str(e['height']): e['url'] for e in thumbs}
120 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
121 return {**output, 'largest': largest}
122
123 def clean_url(url):
124 # externals URLs are redirected through youtube.com/redirect, but we
125 # may encounter internal URLs, too
126 return parse_qs(urlparse(url).query).get('q',[url])[0]
127
128 def toInt(s, fallback=0):
129 if s is None:
130 return fallback
131 try:
132 return int(''.join(filter(str.isdigit, s)))
133 except ValueError:
134 return fallback
135
136 # Remove left-/rightmost word from string:
137 delL = lambda s: s.partition(' ')[2]
138
139 def age(s):
140 if s is None: # missing from autogen'd music, some livestreams
141 return None
142 # Some livestreams have "Streamed 7 hours ago"
143 s = s.replace("Streamed ","")
144 # Now, everything should be in the form "1 year ago"
145 value, unit, _ = s.split(" ")
146 suffix = dict(
147 minute='min',
148 minutes='min',
149 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
150
151 return f"{value}{suffix}"
152
153 def log_unknown_card(data):
154 import json
155 try:
156 from flask import request
157 source = request.url
158 except: source = "unknown"
159 with open("/tmp/innertube.err", "a", encoding="utf-8", errors="backslashreplace") as f:
160 f.write(f"\n/***** {source} *****/\n")
161 json.dump(data, f, indent=2)
162
163 def parse_result_items(items):
164 # TODO: use .get() for most non-essential attributes
165 """
166 parses youtube search response into an easier to use format.
167 """
168 results = []
169 extras = []
170 for item in items:
171 key = next(iter(item.keys()), None)
172 content = item[key]
173 if key == 'videoRenderer':
174 results.append({'type': 'VIDEO', 'content': {
175 'video_id': content['videoId'],
176 'title': content['title']|G.text,
177 'author': content|G('longBylineText','shortBylineText')|G.text,
178 'channel_id': content|G('ownerText')|G('runs')|G(0) \
179 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
180 'length': content|G('lengthText')|G.text, # "44:07", "1:41:50"
181 'views': content|G('viewCountText')|G.text|A.int or 0, # "1,234 {views|watching}", absent on 0 views
182 'published': content|G('publishedTimeText')|G('simpleText')|A(age),
183 'live': content|G('badges')|Select('metadataBadgeRenderer')|G('style')=='BADGE_STYLE_TYPE_LIVE_NOW',
184 }})
185 elif key in ['playlistRenderer', 'radioRenderer', 'showRenderer']: # radio == "Mix" playlist, show == normal playlist, specially displayed
186 results.append({'type': 'PLAYLIST', 'content': {
187 'playlist_id': content['navigationEndpoint']|G('watchEndpoint')|G('playlistId') or \
188 content|G('playlistId'), # COURSE/"learning playlist"
189 'video_id': content['navigationEndpoint']|G('watchEndpoint')|G('videoId') or \
190 videoid_from_thumbnail(content), # learning playlist
191 'title': content['title']|G.text,
192 # Note: learning playlists have no author/channel_id
193 'author': content|G('longBylineText','shortBylineText')|G.text,
194 'channel_id': content|G('longBylineText','shortBylineText')|G('runs')|G(0) \
195 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
196 'n_videos': content|G('videoCount')|A.int or \
197 content|G('videoCountShortText','videoCountText')|G.text, # "Mix" playlists
198 }})
199 elif key == 'channelRenderer':
200 results.append({'type': 'CHANNEL', 'content': {
201 'channel_id': content['channelId'],
202 'title': content['title']|G.text,
203 'icons': content['thumbnail']['thumbnails']|A(mkthumbs),
204 'subscribers': content|G('subscriberCountText')|G('simpleText'), # "2.47K subscribers"
205 }})
206 elif key == 'shelfRenderer':
207 subkey = next(iter(content['content'].keys()), None) #verticalListRenderer/horizontalMovieListRenderer
208 r, e = parse_result_items(content['content'][subkey]['items'])
209 results.extend(r)
210 extras.extend(e)
211 elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent
212 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
213 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
214 pass
215 elif key == 'horizontalCardListRenderer':
216 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
217 pass
218 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
219 pass
220 elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos
221 pass
222 elif key == 'webAnswerRenderer': # "Result from the web"
223 pass
224 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
225 extras.append({
226 'type': 'spelling',
227 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
228 'autocorrected': key == 'showingResultsForRenderer',
229 })
230 elif key == 'messageRenderer': # "No more results"
231 extras.append({
232 'type': 'message',
233 'message': content|G('title','text')|G.text,
234 })
235 elif key == 'backgroundPromoRenderer': # e.g. "no results"
236 extras.append({
237 'type': content['icon']['iconType'],
238 'message': content['title']|G.text,
239 })
240 else:
241 log_unknown_card(item)
242 return results, extras
243
244 def parse_infocard(card):
245 """
246 parses a single infocard into a format that's easier to handle.
247 """
248 card = card['cardRenderer']
249 ctype = list(card['content'].keys())[0]
250 content = card['content'][ctype]
251 if ctype == "pollRenderer":
252 return {'type': "POLL", 'content': {
253 'question': content['question']['simpleText'],
254 'answers': [(a['text']['simpleText'],a['numVotes']) \
255 for a in content['choices']],
256 }}
257 elif ctype == "videoInfoCardContentRenderer":
258 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
259 return {'type': "VIDEO", 'content': {
260 'video_id': content['action']['watchEndpoint']['videoId'],
261 'title': content['videoTitle']['simpleText'],
262 'author': delL(content['channelName']['simpleText']),
263 'length': content.get('lengthString',{}).get('simpleText') \
264 if not is_live else "LIVE", # "23:03"
265 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
266 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
267 }}
268 elif ctype == "playlistInfoCardContentRenderer":
269 return {'type': "PLAYLIST", 'content': {
270 'playlist_id': content['action']['watchEndpoint']['playlistId'],
271 'video_id': content['action']['watchEndpoint']['videoId'],
272 'title': content['playlistTitle']['simpleText'],
273 'author': delL(content['channelName']['simpleText']),
274 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
275 }}
276 elif ctype == "simpleCardContentRenderer" and \
277 'urlEndpoint' in content['command']:
278 return {'type': "WEBSITE", 'content': {
279 'url': clean_url(content['command']['urlEndpoint']['url']),
280 'domain': content['displayDomain']['simpleText'],
281 'title': content['title']['simpleText'],
282 # XXX: no thumbnails for infocards
283 }}
284 elif ctype == "collaboratorInfoCardContentRenderer":
285 return {'type': "CHANNEL", 'content': {
286 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
287 'title': content['channelName']['simpleText'],
288 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
289 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
290 }}
291 else:
292 log_unknown_card(card)
293 return None
294
295 def parse_endcard(card):
296 """
297 parses a single endcard into a format that's easier to handle.
298 """
299 card = card.get('endscreenElementRenderer', card) #only sometimes nested
300 ctype = card['style']
301 if ctype == "CHANNEL":
302 return {'type': ctype, 'content': {
303 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
304 'title': card['title']|G.text,
305 'icons': mkthumbs(card['image']['thumbnails']),
306 }}
307 elif ctype == "VIDEO":
308 if not 'endpoint' in card: return None # title == "This video is unavailable."
309 return {'type': ctype, 'content': {
310 'video_id': card['endpoint']['watchEndpoint']['videoId'],
311 'title': card['title']|G.text,
312 'length': card['videoDuration']|G.text, # '12:21'
313 'views': toInt(card['metadata']|G.text),
314 # XXX: no channel name
315 }}
316 elif ctype == "PLAYLIST":
317 return {'type': ctype, 'content': {
318 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
319 'video_id': card['endpoint']['watchEndpoint']['videoId'],
320 'title': card['title']|G.text,
321 'author': delL(card['metadata']|G.text),
322 'n_videos': toInt(card['playlistLength']|G.text),
323 }}
324 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
325 url = clean_url(card['endpoint']['urlEndpoint']['url'])
326 return {'type': "WEBSITE", 'content': {
327 'url': url,
328 'domain': urlparse(url).netloc,
329 'title': card['title']|G.text,
330 'icons': mkthumbs(card['image']['thumbnails']),
331 }}
332 else:
333 log_unknown_card(card)
334 return None
335
336 def videoid_from_thumbnail(content):
337 # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists)
338 return re.match(r"https?://i.ytimg.com/vi/([-_0-9a-zA-Z]{11})|()",
339 listget(listget(content.get('thumbnails',[]),0,{}).get('thumbnails',[]),0,{}).get('url','')
340 ).group(1)
341
342 def parse_channel_items(items, channel_id, author):
343 result = []
344 extra = []
345 for item in items:
346 key = next(iter(item.keys()), None)
347 content = item[key]
348 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
349 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
350 result.append({'type': 'VIDEO', 'content': {
351 'video_id': content['videoId'],
352 'title': content|G('title')|G.text,
353 'author': content|G('bylineText')|G.text or author,
354 'channel_id': (content|G('bylineText')|G('runs')
355 |Select('navigationEndpoint')
356 |G('browseEndpoint')|G('browseId') or channel_id),
357 'length': (content|G('lengthText')|G.text or # topic channel
358 content|G('thumbnailOverlays')
359 |Select('thumbnailOverlayTimeStatusRenderer')
360 |G('text')|G.text),
361 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
362 'views': content|G('viewCountText')|G.text|A.int,
363 'published': content|G('publishedTimeText')|G.text|A(age),
364 }})
365 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
366 result.append({'type': 'PLAYLIST', 'content': {
367 'playlist_id': (content|G('navigationEndpoint')|G('watchEndpoint')|G('playlistId')
368 or content|G('playlistId')),
369 'video_id': (content|G('navigationEndpoint')|G('watchEndpoint')|G('videoId')
370 or videoid_from_thumbnail(content)),
371 'title': content|G('title')|G.text,
372 'author': author,
373 'channel_id': channel_id,
374 'n_videos': (content|G('videoCount')|A.int or # playlistRenderer
375 content|G('videoCountShortText','videoCountText')|G.text|A.int) # grid
376 }})
377 elif key == "showRenderer":
378 result.append({'type': 'PLAYLIST', 'content': {
379 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
380 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
381 'title': content['title']['simpleText'],
382 'author': author,
383 'channel_id': channel_id,
384 'n_videos': None,
385 }})
386 elif key in ["gridShowRenderer"]:
387 result.append({'type': 'PLAYLIST', 'content': {
388 'playlist_id': (content|G('navigationEndpoint')
389 |G('browseEndpoint')|G('browseId'))[2:],
390 #^: playlistId prefixed with 'VL', which must be removed
391 'video_id': None,
392 'title': content|G('title')|G.text,
393 'author': author,
394 'channel_id': channel_id,
395 'n_videos': content|G('thumbnailOverlays')|G(0)
396 |G('thumbnailOverlayBottomPanelRenderer')|G('text')|G.text,
397 }})
398 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer", "horizontalListRenderer"]:
399 newkey = {
400 "itemSectionRenderer": 'contents',
401 "gridRenderer": 'items',
402 "horizontalCardListRenderer": 'cards',
403 "horizontalListRenderer": 'items',
404 }.get(key)
405 r, e = parse_channel_items(content[newkey], channel_id, author)
406 result.extend(r)
407 extra.extend(e)
408 elif key == "shelfRenderer":
409 r, e = parse_channel_items([content['content']], channel_id, author)
410 result.extend(r)
411 extra.extend(e)
412 elif key == "messageRenderer":
413 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
414 pass
415 elif key == "gameCardRenderer":
416 pass
417 elif key == "gridChannelRenderer":
418 pass # don't care; related channels, e.g. on UCMsgXPD3wzzt8RxHJmXH7hQ
419 else:
420 log_unknown_card(item)
421
422 return result, extra
423
424 def parse_playlist(item):
425 key = next(iter(item.keys()), None)
426 content = item[key]
427 if key == "playlistVideoRenderer":
428 if not content.get('isPlayable', False):
429 return None # private or deleted video
430
431 return {'type': 'VIDEO', 'content': {
432 'video_id': content['videoId'],
433 'title': (content['title'].get('simpleText') or # playable videos
434 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
435 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
436 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
437 # rest is missing from unplayable videos:
438 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
439 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
440 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
441 int(content.get("lengthSeconds", 0))), # "531"
442 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
443 }}
444 else:
445 raise Exception(item) # XXX TODO
Imprint / Impressum