]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
support attribution_link redirects
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4 import re
5
6 class G:
7 """
8 null-coalescing version of dict.get() that also works on lists.
9
10 the | operator is overloaded to achieve similar looking code to jq(1) filters.
11 the first found key is used: dict(foo=1)|G('bar','foo') returns 1.
12 """
13 def __init__(self, *keys):
14 self.keys = keys
15 def __ror__(self, other):
16 for key in self.keys:
17 try: return other[key]
18 except: continue
19 return None
20 class _Text:
21 """ parses youtube's .runs[].text and .simpleText variants """
22 def __ror__(self, other): # Note: only returning runs[0], not concat'ing all!
23 return other|G('simpleText') or other|G('runs')|G(0)|G('text')
24 text = _Text()
25 class Select:
26 """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """
27 def __init__(self, key=None, *, all=None):
28 self.key = key or all
29 self.all = all
30 def __ror__(self, other):
31 try: items = [ other[self.key] for other in other if self.key in other.keys() ]
32 except: items = []
33 return items if self.all else items|G(0)
34 class A:
35 """ apply """
36 def __init__(self, f, *args):
37 self.f = f
38 self.args = args
39 def __ror__(self, other):
40 return self.f(other, *self.args)
41 class _Int:
42 def __ror__(self, other):
43 try: return int(''.join(filter(str.isdigit, other)))
44 except: return None
45 int = _Int()
46
47
48 def prepare_searchresults(yt_results):
49 contents = ( # from continuation token
50 yt_results
51 |G('onResponseReceivedCommands')
52 |Select('appendContinuationItemsAction')
53 |G('continuationItems')
54 ) or ( # from page 1
55 yt_results
56 |G('contents')
57 |G('twoColumnSearchResultsRenderer')
58 |G('primaryContents')
59 |G('sectionListRenderer')
60 |G('contents')
61 )
62 items = contents|Select('itemSectionRenderer')|G('contents')
63 items, extra = parse_result_items(items)
64 more = contents|Select("continuationItemRenderer")|G("continuationEndpoint")|G("continuationCommand")|G("token")
65 estimatedResults = yt_results|G("estimatedResults")
66
67 return items, extra, more
68
69 def prepare_infocards(metadata):
70 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
71 return list(filter(None, map(parse_infocard, cards)))
72
73 def prepare_endcards(metadata):
74 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
75 return list(filter(None, map(parse_endcard, endsc)))
76
77 def prepare_channel(response, channel_id, channel_name):
78 meta1 = response|G('metadata')|G('channelMetadataRenderer')
79 meta2 = response|G('microformat')|G('microformatDataRenderer')
80 title = meta1|G('title') or meta2|G('title') or channel_name
81 descr = meta1|G('description') or meta2|G('description') # meta2.description is capped at 160chars
82 thumb = mkthumbs((meta2|G('thumbnail') or meta1|G('avatar'))|G('thumbnails') or {}) # .avatar ~ 900px
83
84 contents = (
85 response|G('continuationContents') or
86 response|G('onResponseReceivedActions')
87 )
88 if not contents: # overran end of list
89 return title, descr, thumb, [], False
90
91 unparsed = contents|G('gridContinuation')|G('items') or \
92 contents|G('sectionListContinuation')|G('contents') or \
93 contents|G('richGridContinuation')|G('contents') or \
94 contents|Select('appendContinuationItemsAction')|G('continuationItems') or \
95 contents|G(-1)|G('reloadContinuationItemsCommand')|G('continuationItems') or []
96 items, extra = parse_channel_items(unparsed, channel_id, title)
97
98 more = ( # videos, livestreams
99 unparsed
100 |Select('continuationItemRenderer')
101 |G('continuationEndpoint')
102 |G('continuationCommand')
103 |G('token')
104 ) or ( # playlists, search
105 contents
106 |G('gridContinuation', 'sectionListContinuation')
107 |G('continuations')
108 |Select('nextContinuationData')
109 |G('continuation')
110 )
111
112 return title, descr, thumb, items, more
113
114 def prepare_playlist(result):
115 contents = result['continuationContents']
116 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
117 more = (
118 contents
119 |G('playlistVideoListContinuation')
120 |G('continuations')
121 |Select('nextContinuationData')
122 |G('continuation')
123 )
124
125 meta = result|G('sidebar')|G('playlistSidebarRenderer')|G('items')
126 meta1 = meta|Select('playlistSidebarPrimaryInfoRenderer')
127 meta2 = meta|Select('playlistSidebarSecondaryInfoRenderer') \
128 |G('videoOwner')|G('videoOwnerRenderer')
129 title = meta1|G('title')|G.text
130 author = meta2|G('title')|G.text
131 channel_id = meta2|G('navigationEndpoint')|G('browseEndpoint')|G('browseId')
132
133 return title, author, channel_id, list(filter(None, map(parse_playlist, unparsed))), more
134
135 def mkthumbs(thumbs):
136 output = {str(e['height']): e['url'] for e in thumbs}
137 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
138 return {**output, 'largest': largest}
139
140 def clean_url(url):
141 # externals URLs are redirected through youtube.com/redirect, but we
142 # may encounter internal URLs, too
143 return parse_qs(urlparse(url).query).get('q',[url])[0]
144
145 def toInt(s, fallback=0):
146 if s is None:
147 return fallback
148 try:
149 return int(''.join(filter(str.isdigit, s)))
150 except ValueError:
151 return fallback
152
153 # Remove left-/rightmost word from string:
154 delL = lambda s: s.partition(' ')[2]
155
156 def age(s):
157 if s is None: # missing from autogen'd music, some livestreams
158 return None
159 # Some livestreams have "Streamed 7 hours ago"
160 s = s.replace("Streamed ","")
161 # Now, everything should be in the form "1 year ago"
162 value, unit, _ = s.split(" ")
163 suffix = dict(
164 minute='min',
165 minutes='min',
166 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
167
168 return f"{value}{suffix}"
169
170 def log_unknown_card(data):
171 import json
172 try:
173 from flask import request
174 source = request.url
175 except: source = "unknown"
176 with open("/tmp/innertube.err", "a", encoding="utf-8", errors="backslashreplace") as f:
177 f.write(f"\n/***** {source} *****/\n")
178 json.dump(data, f, indent=2)
179
180 def parse_result_items(items):
181 # TODO: use .get() for most non-essential attributes
182 """
183 parses youtube search response into an easier to use format.
184 """
185 results = []
186 extras = []
187 for item in items:
188 key = next(iter(item.keys()), None)
189 content = item[key]
190 if key in ['videoRenderer', 'reelItemRenderer']:
191 results.append({'type': 'VIDEO', 'content': {
192 'video_id': content['videoId'],
193 'title': content|G('title')|G.text or content|G('headline')|G.text,
194 'author': content|G('longBylineText','shortBylineText')|G.text,
195 'channel_id': content|G('ownerText')|G('runs')|G(0) \
196 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
197 'length': content|G('lengthText')|G.text, # "44:07", "1:41:50"
198 'views': content|G('viewCountText')|G.text|A.int or 0, # "1,234 {views|watching}", absent on 0 views
199 'published': content|G('publishedTimeText')|G('simpleText')|A(age),
200 'live': content|G('badges')|Select('metadataBadgeRenderer')|G('style')=='BADGE_STYLE_TYPE_LIVE_NOW',
201 }})
202 elif key in ['playlistRenderer', 'radioRenderer', 'showRenderer']: # radio == "Mix" playlist, show == normal playlist, specially displayed
203 results.append({'type': 'PLAYLIST', 'content': {
204 'playlist_id': content['navigationEndpoint']|G('watchEndpoint')|G('playlistId'),
205 'video_id': content['navigationEndpoint']|G('watchEndpoint')|G('videoId'),
206 'title': content['title']|G.text,
207 'author': content|G('longBylineText','shortBylineText')|G.text,
208 'channel_id': content|G('longBylineText','shortBylineText')|G('runs')|G(0) \
209 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
210 'n_videos': content|G('videoCount')|A.int or \
211 content|G('videoCountShortText','videoCountText')|G.text, # "Mix" playlists
212 }})
213 elif key == 'channelRenderer':
214 results.append({'type': 'CHANNEL', 'content': {
215 'channel_id': content['channelId'],
216 'title': content['title']|G.text,
217 'icons': content['thumbnail']['thumbnails']|A(mkthumbs),
218 'subscribers': content|G('subscriberCountText')|G('simpleText'), # "2.47K subscribers"
219 }})
220 elif key == 'shelfRenderer':
221 subkey = next(iter(content['content'].keys()), None) #verticalListRenderer/horizontalMovieListRenderer
222 r, e = parse_result_items(content['content'][subkey]['items'])
223 results.extend(r)
224 extras.extend(e)
225 elif key in ["reelShelfRenderer"]:
226 r, e = parse_result_items(content['items'])
227 results.extend(r)
228 extras.extend(e)
229 elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent
230 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
231 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer',
232 'promotedSparklesWebRenderer','compactPromotedItemRenderer', 'adSlotRenderer']: # haha, no.
233 pass
234 elif key == 'horizontalCardListRenderer':
235 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
236 pass
237 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
238 pass
239 elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos
240 pass
241 elif key == 'webAnswerRenderer': # "Result from the web"
242 pass
243 elif key == 'infoPanelContentRenderer': # "These results may be new or changing quickly"
244 pass
245 elif key == 'hashtagTileRenderer': # link to '/hashtag/<search_query>'
246 pass
247 elif key in ['didYouMeanRenderer', 'showingResultsForRenderer', 'includingResultsForRenderer']:
248 extras.append({
249 'type': 'spelling',
250 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
251 'autocorrected': key in ['showingResultsForRenderer', 'includingResultsForRenderer'],
252 })
253 elif key == 'messageRenderer': # "No more results"
254 extras.append({
255 'type': 'message',
256 'message': content|G('title','text')|G.text,
257 })
258 elif key == 'backgroundPromoRenderer': # e.g. "no results"
259 extras.append({
260 'type': content['icon']['iconType'],
261 'message': content['title']|G.text,
262 })
263 else:
264 log_unknown_card(item)
265 return results, extras
266
267 def parse_infocard(card):
268 """
269 parses a single infocard into a format that's easier to handle.
270 """
271 card = card['cardRenderer']
272 if not 'content' in card:
273 return None # probably the "View corrections" card, ignore.
274 ctype = list(card['content'].keys())[0]
275 content = card['content'][ctype]
276 if ctype == "pollRenderer":
277 return {'type': "POLL", 'content': {
278 'question': content['question']['simpleText'],
279 'answers': [(a['text']['simpleText'],a['numVotes']) \
280 for a in content['choices']],
281 }}
282 elif ctype == "videoInfoCardContentRenderer":
283 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
284 return {'type': "VIDEO", 'content': {
285 'video_id': content['action']['watchEndpoint']['videoId'],
286 'title': content['videoTitle']['simpleText'],
287 'author': delL(content['channelName']['simpleText']),
288 'length': content.get('lengthString',{}).get('simpleText') \
289 if not is_live else "LIVE", # "23:03"
290 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
291 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
292 }}
293 elif ctype == "playlistInfoCardContentRenderer":
294 return {'type': "PLAYLIST", 'content': {
295 'playlist_id': content['action']['watchEndpoint']['playlistId'],
296 'video_id': content['action']['watchEndpoint']['videoId'],
297 'title': content['playlistTitle']['simpleText'],
298 'author': delL(content['channelName']['simpleText']),
299 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
300 }}
301 elif ctype == "simpleCardContentRenderer" and \
302 'urlEndpoint' in content['command']:
303 return {'type': "WEBSITE", 'content': {
304 'url': clean_url(content['command']['urlEndpoint']['url']),
305 'domain': content['displayDomain']['simpleText'],
306 'title': content['title']['simpleText'],
307 # XXX: no thumbnails for infocards
308 }}
309 elif ctype == "collaboratorInfoCardContentRenderer":
310 return {'type': "CHANNEL", 'content': {
311 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
312 'title': content['channelName']['simpleText'],
313 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
314 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
315 }}
316 else:
317 log_unknown_card(card)
318 return None
319
320 def parse_endcard(card):
321 """
322 parses a single endcard into a format that's easier to handle.
323 """
324 card = card.get('endscreenElementRenderer', card) #only sometimes nested
325 ctype = card['style']
326 if ctype == "CHANNEL":
327 return {'type': ctype, 'content': {
328 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
329 'title': card['title']|G.text,
330 'icons': mkthumbs(card['image']['thumbnails']),
331 }}
332 elif ctype == "VIDEO":
333 if not 'endpoint' in card: return None # title == "This video is unavailable."
334 return {'type': ctype, 'content': {
335 'video_id': card['endpoint']['watchEndpoint']['videoId'],
336 'title': card['title']|G.text,
337 'length': card|G('videoDuration')|G.text, # '12:21'
338 'views': toInt(card['metadata']|G.text),
339 # XXX: no channel name
340 }}
341 elif ctype == "PLAYLIST":
342 return {'type': ctype, 'content': {
343 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
344 'video_id': card['endpoint']['watchEndpoint']['videoId'],
345 'title': card['title']|G.text,
346 'author': delL(card['metadata']|G.text),
347 'n_videos': toInt(card['playlistLength']|G.text),
348 }}
349 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
350 url = clean_url(card['endpoint']['urlEndpoint']['url'])
351 return {'type': "WEBSITE", 'content': {
352 'url': url,
353 'domain': urlparse(url).netloc,
354 'title': card['title']|G.text,
355 'icons': mkthumbs(card['image']['thumbnails']),
356 }}
357 else:
358 log_unknown_card(card)
359 return None
360
361 def parse_channel_items(items, channel_id, author):
362 result = []
363 extra = []
364 for item in items:
365 key = next(iter(item.keys()), None)
366 content = item[key]
367 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer", 'reelItemRenderer']: # reel==youtube-shorts
368 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
369 result.append({'type': 'VIDEO', 'content': {
370 'video_id': content['videoId'],
371 'title': content|G('title')|G.text or content|G('headline')|G.text,
372 'author': content|G('bylineText')|G.text or author,
373 'channel_id': (content|G('bylineText')|G('runs')
374 |Select('navigationEndpoint')
375 |G('browseEndpoint')|G('browseId') or channel_id),
376 'length': (content|G('lengthText')|G.text or # topic channel
377 content|G('thumbnailOverlays')
378 |Select('thumbnailOverlayTimeStatusRenderer')
379 |G('text')|G.text),
380 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
381 'views': content|G('viewCountText')|G.text|A.int,
382 'published': content|G('publishedTimeText')|G.text|A(age),
383 }})
384 elif key in ["gridPlaylistRenderer", "playlistRenderer", "gridRadioRenderer"]:
385 result.append({'type': 'PLAYLIST', 'content': {
386 'playlist_id': content|G('navigationEndpoint')|G('watchEndpoint')|G('playlistId'),
387 'video_id': content|G('navigationEndpoint')|G('watchEndpoint')|G('videoId'),
388 'title': content|G('title')|G.text,
389 'author': author, # Note: gridRadioRenderer is by 'Youtube' without channel_id, ignoring that.
390 'channel_id': channel_id,
391 'n_videos': (content|G('videoCount')|A.int or # playlistRenderer
392 content|G('videoCountShortText','videoCountText')|G.text|A.int) # grid
393 }})
394 elif key == "showRenderer":
395 result.append({'type': 'PLAYLIST', 'content': {
396 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
397 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
398 'title': content['title']['simpleText'],
399 'author': author,
400 'channel_id': channel_id,
401 'n_videos': None,
402 }})
403 elif key in ["gridShowRenderer"]:
404 result.append({'type': 'PLAYLIST', 'content': {
405 'playlist_id': (content|G('navigationEndpoint')
406 |G('browseEndpoint')|G('browseId'))[2:],
407 #^: playlistId prefixed with 'VL', which must be removed
408 'video_id': None,
409 'title': content|G('title')|G.text,
410 'author': author,
411 'channel_id': channel_id,
412 'n_videos': content|G('thumbnailOverlays')|G(0)
413 |G('thumbnailOverlayBottomPanelRenderer')|G('text')|G.text,
414 }})
415 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer", "horizontalListRenderer"]:
416 newkey = {
417 "itemSectionRenderer": 'contents',
418 "gridRenderer": 'items',
419 "horizontalCardListRenderer": 'cards',
420 "horizontalListRenderer": 'items',
421 }.get(key)
422 r, e = parse_channel_items(content[newkey], channel_id, author)
423 result.extend(r)
424 extra.extend(e)
425 elif key in ["shelfRenderer", "richItemRenderer"]:
426 r, e = parse_channel_items([content['content']], channel_id, author)
427 result.extend(r)
428 extra.extend(e)
429 elif key in ["reelShelfRenderer"]:
430 r, e = parse_channel_items(content['items'], channel_id, author)
431 result.extend(r)
432 extra.extend(e)
433 elif key == "messageRenderer":
434 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
435 pass
436 elif key == "gameCardRenderer":
437 pass
438 elif key == "gridChannelRenderer":
439 pass # don't care; related channels, e.g. on UCMsgXPD3wzzt8RxHJmXH7hQ
440 elif key == 'continuationItemRenderer': # handled in parent function
441 pass
442 else:
443 log_unknown_card(item)
444
445 return result, extra
446
447 def parse_playlist(item):
448 key = next(iter(item.keys()), None)
449 content = item[key]
450 if key == "playlistVideoRenderer":
451 if not content.get('isPlayable', False):
452 return None # private or deleted video
453
454 return {'type': 'VIDEO', 'content': {
455 'video_id': content['videoId'],
456 'title': (content['title'].get('simpleText') or # playable videos
457 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
458 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
459 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
460 # rest is missing from unplayable videos:
461 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
462 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
463 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
464 int(content.get("lengthSeconds", 0))), # "531"
465 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
466 }}
467 else:
468 raise Exception(item) # XXX TODO
Imprint / Impressum