]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
port search to new api; use continuation tokens here too
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4 import re
5
6 def findall(obj, key):
7 """
8 given a list of dicts, where one dict contains a given key, return said key.
9 """
10 if obj is None: return []
11 return [ obj[key] for obj in obj if key in obj.keys() ]
12 def listget(obj, index, fallback=None):
13 if obj is None: return fallback
14 return next(iter(obj[index:]), fallback)
15 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
16 first = lambda l: next(iter(l),{})
17 listfind = lambda obj,key: first(findall(obj,key))
18
19 class G:
20 """
21 null-coalescing version of dict.get() that also works on lists.
22
23 the | operator is overloaded to achieve similar looking code to jq(1) filters.
24 the first found key is used: dict(foo=1)|G('bar','foo') returns 1.
25 """
26 def __init__(self, *keys):
27 self.keys = keys
28 def __ror__(self, other):
29 for key in self.keys:
30 try: return other[key]
31 except: continue
32 return None
33 class _Text:
34 """ parses youtube's .runs[].text and .simpleText variants """
35 def __ror__(self, other): # Note: only returning runs[0], not concat'ing all!
36 return other|G('simpleText') or other|G('runs')|G(0)|G('text')
37 text = _Text()
38 class Select:
39 """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """
40 def __init__(self, key=None, *, all=None):
41 self.key = key or all
42 self.all = all
43 def __ror__(self, other):
44 try: items = [ other[self.key] for other in other if self.key in other.keys() ]
45 except: items = []
46 return items if self.all else items|G(0)
47 class A:
48 """ apply """
49 def __init__(self, f, *args):
50 self.f = f
51 self.args = args
52 def __ror__(self, other):
53 return self.f(other, *self.args)
54 class _Int:
55 def __ror__(self, other):
56 try: return int(''.join(filter(str.isdigit, other)))
57 except: return None
58 int = _Int()
59
60
61 def prepare_searchresults(yt_results):
62 contents = ( # from continuation token
63 yt_results
64 |G('onResponseReceivedCommands')
65 |Select('appendContinuationItemsAction')
66 |G('continuationItems')
67 ) or ( # from page 1
68 yt_results
69 |G('contents')
70 |G('twoColumnSearchResultsRenderer')
71 |G('primaryContents')
72 |G('sectionListRenderer')
73 |G('contents')
74 )
75 items = flatten([c.get('contents',[]) for c in contents|Select(all='itemSectionRenderer')])
76 items, extra = parse_result_items(items)
77 more = contents|Select("continuationItemRenderer")|G("continuationEndpoint")|G("continuationCommand")|G("token")
78 estimatedResults = yt_results|G("estimatedResults")
79
80 return items, extra, more
81
82 def prepare_infocards(metadata):
83 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
84 return list(filter(None, map(parse_infocard, cards)))
85
86 def prepare_endcards(metadata):
87 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
88 return list(filter(None, map(parse_endcard, endsc)))
89
90 def prepare_channel(response, channel_id):
91 meta1 = response|G('metadata')|G('channelMetadataRenderer')
92 meta2 = response|G('microformat')|G('microformatDataRenderer')
93 title = meta1|G('title') or meta2|G('title')
94 descr = meta1|G('description') or meta2|G('description') # meta2.description is capped at 160chars
95 thumb = mkthumbs((meta2|G('thumbnail') or meta1|G('avatar'))|G('thumbnails') or {}) # .avatar ~ 900px
96
97 contents = response|G('continuationContents')
98 if not contents: # overran end of list
99 return title, descr, thumb, [], False
100
101 unparsed = contents|G('gridContinuation')|G('items') or \
102 contents|G('sectionListContinuation')|G('contents') or []
103 items, extra = parse_channel_items(unparsed, channel_id, title)
104 more = (
105 contents
106 |G('gridContinuation', 'sectionListContinuation')
107 |G('continuations')
108 |Select('nextContinuationData')
109 |G('continuation')
110 )
111
112 return title, descr, thumb, items, more
113
114 def prepare_playlist(result):
115 contents = result['continuationContents']
116 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
117 more = (
118 contents
119 |G('playlistVideoListContinuation')
120 |G('continuations')
121 |Select('nextContinuationData')
122 |G('continuation')
123 )
124
125 return list(filter(None, map(parse_playlist, unparsed))), more
126
127 def mkthumbs(thumbs):
128 output = {str(e['height']): e['url'] for e in thumbs}
129 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
130 return {**output, 'largest': largest}
131
132 def clean_url(url):
133 # externals URLs are redirected through youtube.com/redirect, but we
134 # may encounter internal URLs, too
135 return parse_qs(urlparse(url).query).get('q',[url])[0]
136
137 def toInt(s, fallback=0):
138 if s is None:
139 return fallback
140 try:
141 return int(''.join(filter(str.isdigit, s)))
142 except ValueError:
143 return fallback
144
145 # Remove left-/rightmost word from string:
146 delL = lambda s: s.partition(' ')[2]
147
148 def age(s):
149 if s is None: # missing from autogen'd music, some livestreams
150 return None
151 # Some livestreams have "Streamed 7 hours ago"
152 s = s.replace("Streamed ","")
153 # Now, everything should be in the form "1 year ago"
154 value, unit, _ = s.split(" ")
155 suffix = dict(
156 minute='min',
157 minutes='min',
158 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
159
160 return f"{value}{suffix}"
161
162 def log_unknown_card(data):
163 import json
164 try:
165 from flask import request
166 source = request.url
167 except: source = "unknown"
168 with open("/tmp/innertube.err", "a") as f:
169 f.write(f"\n/***** {source} *****/\n")
170 json.dump(data, f, indent=2)
171
172 def parse_result_items(items):
173 # TODO: use .get() for most non-essential attributes
174 """
175 parses youtube search response into an easier to use format.
176 """
177 results = []
178 extras = []
179 for item in items:
180 key = next(iter(item.keys()), None)
181 content = item[key]
182 if key == 'videoRenderer':
183 results.append({'type': 'VIDEO', 'content': {
184 'video_id': content['videoId'],
185 'title': content['title']|G.text,
186 'author': content|G('longBylineText','shortBylineText')|G.text,
187 'channel_id': content|G('ownerText')|G('runs')|G(0) \
188 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
189 'length': content|G('lengthText')|G.text, # "44:07", "1:41:50"
190 'views': content|G('viewCountText')|G.text|A.int or 0, # "1,234 {views|watching}", absent on 0 views
191 'published': content|G('publishedTimeText')|G('simpleText')|A(age),
192 'live': content|G('badges')|Select('metadataBadgeRenderer')|G('style')=='BADGE_STYLE_TYPE_LIVE_NOW',
193 }})
194 elif key in ['playlistRenderer', 'radioRenderer', 'showRenderer']: # radio == "Mix" playlist, show == normal playlist, specially displayed
195 results.append({'type': 'PLAYLIST', 'content': {
196 'playlist_id': content['navigationEndpoint']|G('watchEndpoint')|G('playlistId') or \
197 content|G('playlistId'), # COURSE/"learning playlist"
198 'video_id': content['navigationEndpoint']|G('watchEndpoint')|G('videoId') or \
199 videoid_from_thumbnail(content), # learning playlist
200 'title': content['title']|G.text,
201 # Note: learning playlists have no author/channel_id
202 'author': content|G('longBylineText','shortBylineText')|G.text,
203 'channel_id': content|G('longBylineText','shortBylineText')|G('runs')|G(0) \
204 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
205 'n_videos': content|G('videoCount')|A.int or \
206 content|G('videoCountShortText','videoCountText')|G.text, # "Mix" playlists
207 }})
208 elif key == 'channelRenderer':
209 results.append({'type': 'CHANNEL', 'content': {
210 'channel_id': content['channelId'],
211 'title': content['title']|G.text,
212 'icons': content['thumbnail']['thumbnails']|A(mkthumbs),
213 'subscribers': content|G('subscriberCountText')|G('simpleText'), # "2.47K subscribers"
214 }})
215 elif key == 'shelfRenderer':
216 subkey = next(iter(content['content'].keys()), None) #verticalListRenderer/horizontalMovieListRenderer
217 r, e = parse_result_items(content['content'][subkey]['items'])
218 results.extend(r)
219 extras.extend(e)
220 elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent
221 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
222 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
223 pass
224 elif key == 'horizontalCardListRenderer':
225 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
226 pass
227 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
228 pass
229 elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos
230 pass
231 elif key == 'webAnswerRenderer': # "Result from the web"
232 pass
233 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
234 extras.append({
235 'type': 'spelling',
236 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
237 'autocorrected': key == 'showingResultsForRenderer',
238 })
239 elif key == 'messageRenderer': # "No more results"
240 extras.append({
241 'type': 'message',
242 'message': content|G('title','text')|G.text,
243 })
244 elif key == 'backgroundPromoRenderer': # e.g. "no results"
245 extras.append({
246 'type': content['icon']['iconType'],
247 'message': content['title']|G.text,
248 })
249 else:
250 log_unknown_card(item)
251 return results, extras
252
253 def parse_infocard(card):
254 """
255 parses a single infocard into a format that's easier to handle.
256 """
257 card = card['cardRenderer']
258 ctype = list(card['content'].keys())[0]
259 content = card['content'][ctype]
260 if ctype == "pollRenderer":
261 return {'type': "POLL", 'content': {
262 'question': content['question']['simpleText'],
263 'answers': [(a['text']['simpleText'],a['numVotes']) \
264 for a in content['choices']],
265 }}
266 elif ctype == "videoInfoCardContentRenderer":
267 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
268 return {'type': "VIDEO", 'content': {
269 'video_id': content['action']['watchEndpoint']['videoId'],
270 'title': content['videoTitle']['simpleText'],
271 'author': delL(content['channelName']['simpleText']),
272 'length': content.get('lengthString',{}).get('simpleText') \
273 if not is_live else "LIVE", # "23:03"
274 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
275 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
276 }}
277 elif ctype == "playlistInfoCardContentRenderer":
278 return {'type': "PLAYLIST", 'content': {
279 'playlist_id': content['action']['watchEndpoint']['playlistId'],
280 'video_id': content['action']['watchEndpoint']['videoId'],
281 'title': content['playlistTitle']['simpleText'],
282 'author': delL(content['channelName']['simpleText']),
283 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
284 }}
285 elif ctype == "simpleCardContentRenderer" and \
286 'urlEndpoint' in content['command']:
287 return {'type': "WEBSITE", 'content': {
288 'url': clean_url(content['command']['urlEndpoint']['url']),
289 'domain': content['displayDomain']['simpleText'],
290 'title': content['title']['simpleText'],
291 # XXX: no thumbnails for infocards
292 }}
293 elif ctype == "collaboratorInfoCardContentRenderer":
294 return {'type': "CHANNEL", 'content': {
295 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
296 'title': content['channelName']['simpleText'],
297 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
298 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
299 }}
300 else:
301 log_unknown_card(card)
302 return None
303
304 def parse_endcard(card):
305 """
306 parses a single endcard into a format that's easier to handle.
307 """
308 card = card.get('endscreenElementRenderer', card) #only sometimes nested
309 ctype = card['style']
310 if ctype == "CHANNEL":
311 return {'type': ctype, 'content': {
312 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
313 'title': card['title']|G.text,
314 'icons': mkthumbs(card['image']['thumbnails']),
315 }}
316 elif ctype == "VIDEO":
317 if not 'endpoint' in card: return None # title == "This video is unavailable."
318 return {'type': ctype, 'content': {
319 'video_id': card['endpoint']['watchEndpoint']['videoId'],
320 'title': card['title']|G.text,
321 'length': card['videoDuration']|G.text, # '12:21'
322 'views': toInt(card['metadata']|G.text),
323 # XXX: no channel name
324 }}
325 elif ctype == "PLAYLIST":
326 return {'type': ctype, 'content': {
327 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
328 'video_id': card['endpoint']['watchEndpoint']['videoId'],
329 'title': card['title']|G.text,
330 'author': delL(card['metadata']|G.text),
331 'n_videos': toInt(card['playlistLength']|G.text),
332 }}
333 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
334 url = clean_url(card['endpoint']['urlEndpoint']['url'])
335 return {'type': "WEBSITE", 'content': {
336 'url': url,
337 'domain': urlparse(url).netloc,
338 'title': card['title']|G.text,
339 'icons': mkthumbs(card['image']['thumbnails']),
340 }}
341 else:
342 log_unknown_card(card)
343 return None
344
345 def videoid_from_thumbnail(content):
346 # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists)
347 return re.match(r"https?://i.ytimg.com/vi/([-_0-9a-zA-Z]{11})|()",
348 listget(listget(content.get('thumbnails',[]),0,{}).get('thumbnails',[]),0,{}).get('url','')
349 ).group(1)
350
351 def parse_channel_items(items, channel_id, author):
352 result = []
353 extra = []
354 for item in items:
355 key = next(iter(item.keys()), None)
356 content = item[key]
357 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
358 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
359 result.append({'type': 'VIDEO', 'content': {
360 'video_id': content['videoId'],
361 'title': content['title'].get('simpleText') or content['title'].get('runs',[{}])[0].get('text'),
362 'author': content.get('bylineText',{}).get('runs',[{}])[0].get('text') or author,
363 'channel_id': content.get('bylineText',{}).get('runs',[{}])[0] \
364 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id,
365 'length': (content.get('lengthText',{}).get('simpleText') or # topic channel
366 listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer')
367 .get('text',{}).get('simpleText')),
368 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
369 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
370 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
371 }})
372 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
373 result.append({'type': 'PLAYLIST', 'content': {
374 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or content.get('playlistId'),
375 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId',{}) or videoid_from_thumbnail(content),
376 'title': (content['title'].get('simpleText') or # playlistRenderer
377 content['title']['runs'][0]['text']), # gridPlaylistRenderer
378 'author': author,
379 'channel_id': channel_id,
380 'n_videos': toInt(content.get('videoCount') or # playlistRenderer
381 content.get('videoCountShortText',{}).get('simpleText') or # grid(1)
382 content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
383 }})
384 elif key == "showRenderer":
385 result.append({'type': 'PLAYLIST', 'content': {
386 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
387 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
388 'title': content['title']['simpleText'],
389 'author': author,
390 'channel_id': channel_id,
391 'n_videos': None,
392 }})
393 elif key in ["gridShowRenderer"]:
394 result.append({'type': 'PLAYLIST', 'content': {
395 'playlist_id': (content|G('navigationEndpoint')
396 |G('browseEndpoint')|G('browseId'))[2:],
397 #^: playlistId prefixed with 'VL', which must be removed
398 'video_id': None,
399 'title': content|G('title')|G.text,
400 'author': author,
401 'channel_id': channel_id,
402 'n_videos': content|G('thumbnailOverlays')|G(0)
403 |G('thumbnailOverlayBottomPanelRenderer')|G('text')|G.text,
404 }})
405 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer", "horizontalListRenderer"]:
406 newkey = {
407 "itemSectionRenderer": 'contents',
408 "gridRenderer": 'items',
409 "horizontalCardListRenderer": 'cards',
410 "horizontalListRenderer": 'items',
411 }.get(key)
412 r, e = parse_channel_items(content[newkey], channel_id, author)
413 result.extend(r)
414 extra.extend(e)
415 elif key == "shelfRenderer":
416 r, e = parse_channel_items([content['content']], channel_id, author)
417 result.extend(r)
418 extra.extend(e)
419 elif key == "messageRenderer":
420 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
421 pass
422 elif key == "gameCardRenderer":
423 pass
424 elif key == "gridChannelRenderer":
425 pass # don't care; related channels, e.g. on UCMsgXPD3wzzt8RxHJmXH7hQ
426 else:
427 log_unknown_card(item)
428
429 return result, extra
430
431 def parse_playlist(item):
432 key = next(iter(item.keys()), None)
433 content = item[key]
434 if key == "playlistVideoRenderer":
435 if not content.get('isPlayable', False):
436 return None # private or deleted video
437
438 return {'type': 'VIDEO', 'content': {
439 'video_id': content['videoId'],
440 'title': (content['title'].get('simpleText') or # playable videos
441 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
442 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
443 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
444 # rest is missing from unplayable videos:
445 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
446 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
447 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
448 int(content.get("lengthSeconds", 0))), # "531"
449 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
450 }}
451 else:
452 raise Exception(item) # XXX TODO
Imprint / Impressum