]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
don't throw 500 error when logging unknown card type
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4 import re
5
6 def listget(obj, index, fallback=None):
7 if obj is None: return fallback
8 return next(iter(obj[index:]), fallback)
9 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
10
11 class G:
12 """
13 null-coalescing version of dict.get() that also works on lists.
14
15 the | operator is overloaded to achieve similar looking code to jq(1) filters.
16 the first found key is used: dict(foo=1)|G('bar','foo') returns 1.
17 """
18 def __init__(self, *keys):
19 self.keys = keys
20 def __ror__(self, other):
21 for key in self.keys:
22 try: return other[key]
23 except: continue
24 return None
25 class _Text:
26 """ parses youtube's .runs[].text and .simpleText variants """
27 def __ror__(self, other): # Note: only returning runs[0], not concat'ing all!
28 return other|G('simpleText') or other|G('runs')|G(0)|G('text')
29 text = _Text()
30 class Select:
31 """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """
32 def __init__(self, key=None, *, all=None):
33 self.key = key or all
34 self.all = all
35 def __ror__(self, other):
36 try: items = [ other[self.key] for other in other if self.key in other.keys() ]
37 except: items = []
38 return items if self.all else items|G(0)
39 class A:
40 """ apply """
41 def __init__(self, f, *args):
42 self.f = f
43 self.args = args
44 def __ror__(self, other):
45 return self.f(other, *self.args)
46 class _Int:
47 def __ror__(self, other):
48 try: return int(''.join(filter(str.isdigit, other)))
49 except: return None
50 int = _Int()
51
52
53 def prepare_searchresults(yt_results):
54 contents = ( # from continuation token
55 yt_results
56 |G('onResponseReceivedCommands')
57 |Select('appendContinuationItemsAction')
58 |G('continuationItems')
59 ) or ( # from page 1
60 yt_results
61 |G('contents')
62 |G('twoColumnSearchResultsRenderer')
63 |G('primaryContents')
64 |G('sectionListRenderer')
65 |G('contents')
66 )
67 items = flatten([c.get('contents',[]) for c in contents|Select(all='itemSectionRenderer')])
68 items, extra = parse_result_items(items)
69 more = contents|Select("continuationItemRenderer")|G("continuationEndpoint")|G("continuationCommand")|G("token")
70 estimatedResults = yt_results|G("estimatedResults")
71
72 return items, extra, more
73
74 def prepare_infocards(metadata):
75 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
76 return list(filter(None, map(parse_infocard, cards)))
77
78 def prepare_endcards(metadata):
79 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
80 return list(filter(None, map(parse_endcard, endsc)))
81
82 def prepare_channel(response, channel_id):
83 meta1 = response|G('metadata')|G('channelMetadataRenderer')
84 meta2 = response|G('microformat')|G('microformatDataRenderer')
85 title = meta1|G('title') or meta2|G('title')
86 descr = meta1|G('description') or meta2|G('description') # meta2.description is capped at 160chars
87 thumb = mkthumbs((meta2|G('thumbnail') or meta1|G('avatar'))|G('thumbnails') or {}) # .avatar ~ 900px
88
89 contents = response|G('continuationContents')
90 if not contents: # overran end of list
91 return title, descr, thumb, [], False
92
93 unparsed = contents|G('gridContinuation')|G('items') or \
94 contents|G('sectionListContinuation')|G('contents') or []
95 items, extra = parse_channel_items(unparsed, channel_id, title)
96 more = (
97 contents
98 |G('gridContinuation', 'sectionListContinuation')
99 |G('continuations')
100 |Select('nextContinuationData')
101 |G('continuation')
102 )
103
104 return title, descr, thumb, items, more
105
106 def prepare_playlist(result):
107 contents = result['continuationContents']
108 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
109 more = (
110 contents
111 |G('playlistVideoListContinuation')
112 |G('continuations')
113 |Select('nextContinuationData')
114 |G('continuation')
115 )
116
117 return list(filter(None, map(parse_playlist, unparsed))), more
118
119 def mkthumbs(thumbs):
120 output = {str(e['height']): e['url'] for e in thumbs}
121 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
122 return {**output, 'largest': largest}
123
124 def clean_url(url):
125 # externals URLs are redirected through youtube.com/redirect, but we
126 # may encounter internal URLs, too
127 return parse_qs(urlparse(url).query).get('q',[url])[0]
128
129 def toInt(s, fallback=0):
130 if s is None:
131 return fallback
132 try:
133 return int(''.join(filter(str.isdigit, s)))
134 except ValueError:
135 return fallback
136
137 # Remove left-/rightmost word from string:
138 delL = lambda s: s.partition(' ')[2]
139
140 def age(s):
141 if s is None: # missing from autogen'd music, some livestreams
142 return None
143 # Some livestreams have "Streamed 7 hours ago"
144 s = s.replace("Streamed ","")
145 # Now, everything should be in the form "1 year ago"
146 value, unit, _ = s.split(" ")
147 suffix = dict(
148 minute='min',
149 minutes='min',
150 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
151
152 return f"{value}{suffix}"
153
154 def log_unknown_card(data):
155 import json
156 try:
157 from flask import request
158 source = request.url
159 except: source = "unknown"
160 with open("/tmp/innertube.err", "a", encoding="utf-8", errors="backslashreplace") as f:
161 f.write(f"\n/***** {source} *****/\n")
162 json.dump(data, f, indent=2)
163
164 def parse_result_items(items):
165 # TODO: use .get() for most non-essential attributes
166 """
167 parses youtube search response into an easier to use format.
168 """
169 results = []
170 extras = []
171 for item in items:
172 key = next(iter(item.keys()), None)
173 content = item[key]
174 if key == 'videoRenderer':
175 results.append({'type': 'VIDEO', 'content': {
176 'video_id': content['videoId'],
177 'title': content['title']|G.text,
178 'author': content|G('longBylineText','shortBylineText')|G.text,
179 'channel_id': content|G('ownerText')|G('runs')|G(0) \
180 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
181 'length': content|G('lengthText')|G.text, # "44:07", "1:41:50"
182 'views': content|G('viewCountText')|G.text|A.int or 0, # "1,234 {views|watching}", absent on 0 views
183 'published': content|G('publishedTimeText')|G('simpleText')|A(age),
184 'live': content|G('badges')|Select('metadataBadgeRenderer')|G('style')=='BADGE_STYLE_TYPE_LIVE_NOW',
185 }})
186 elif key in ['playlistRenderer', 'radioRenderer', 'showRenderer']: # radio == "Mix" playlist, show == normal playlist, specially displayed
187 results.append({'type': 'PLAYLIST', 'content': {
188 'playlist_id': content['navigationEndpoint']|G('watchEndpoint')|G('playlistId') or \
189 content|G('playlistId'), # COURSE/"learning playlist"
190 'video_id': content['navigationEndpoint']|G('watchEndpoint')|G('videoId') or \
191 videoid_from_thumbnail(content), # learning playlist
192 'title': content['title']|G.text,
193 # Note: learning playlists have no author/channel_id
194 'author': content|G('longBylineText','shortBylineText')|G.text,
195 'channel_id': content|G('longBylineText','shortBylineText')|G('runs')|G(0) \
196 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
197 'n_videos': content|G('videoCount')|A.int or \
198 content|G('videoCountShortText','videoCountText')|G.text, # "Mix" playlists
199 }})
200 elif key == 'channelRenderer':
201 results.append({'type': 'CHANNEL', 'content': {
202 'channel_id': content['channelId'],
203 'title': content['title']|G.text,
204 'icons': content['thumbnail']['thumbnails']|A(mkthumbs),
205 'subscribers': content|G('subscriberCountText')|G('simpleText'), # "2.47K subscribers"
206 }})
207 elif key == 'shelfRenderer':
208 subkey = next(iter(content['content'].keys()), None) #verticalListRenderer/horizontalMovieListRenderer
209 r, e = parse_result_items(content['content'][subkey]['items'])
210 results.extend(r)
211 extras.extend(e)
212 elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent
213 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
214 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
215 pass
216 elif key == 'horizontalCardListRenderer':
217 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
218 pass
219 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
220 pass
221 elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos
222 pass
223 elif key == 'webAnswerRenderer': # "Result from the web"
224 pass
225 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
226 extras.append({
227 'type': 'spelling',
228 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
229 'autocorrected': key == 'showingResultsForRenderer',
230 })
231 elif key == 'messageRenderer': # "No more results"
232 extras.append({
233 'type': 'message',
234 'message': content|G('title','text')|G.text,
235 })
236 elif key == 'backgroundPromoRenderer': # e.g. "no results"
237 extras.append({
238 'type': content['icon']['iconType'],
239 'message': content['title']|G.text,
240 })
241 else:
242 log_unknown_card(item)
243 return results, extras
244
245 def parse_infocard(card):
246 """
247 parses a single infocard into a format that's easier to handle.
248 """
249 card = card['cardRenderer']
250 ctype = list(card['content'].keys())[0]
251 content = card['content'][ctype]
252 if ctype == "pollRenderer":
253 return {'type': "POLL", 'content': {
254 'question': content['question']['simpleText'],
255 'answers': [(a['text']['simpleText'],a['numVotes']) \
256 for a in content['choices']],
257 }}
258 elif ctype == "videoInfoCardContentRenderer":
259 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
260 return {'type': "VIDEO", 'content': {
261 'video_id': content['action']['watchEndpoint']['videoId'],
262 'title': content['videoTitle']['simpleText'],
263 'author': delL(content['channelName']['simpleText']),
264 'length': content.get('lengthString',{}).get('simpleText') \
265 if not is_live else "LIVE", # "23:03"
266 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
267 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
268 }}
269 elif ctype == "playlistInfoCardContentRenderer":
270 return {'type': "PLAYLIST", 'content': {
271 'playlist_id': content['action']['watchEndpoint']['playlistId'],
272 'video_id': content['action']['watchEndpoint']['videoId'],
273 'title': content['playlistTitle']['simpleText'],
274 'author': delL(content['channelName']['simpleText']),
275 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
276 }}
277 elif ctype == "simpleCardContentRenderer" and \
278 'urlEndpoint' in content['command']:
279 return {'type': "WEBSITE", 'content': {
280 'url': clean_url(content['command']['urlEndpoint']['url']),
281 'domain': content['displayDomain']['simpleText'],
282 'title': content['title']['simpleText'],
283 # XXX: no thumbnails for infocards
284 }}
285 elif ctype == "collaboratorInfoCardContentRenderer":
286 return {'type': "CHANNEL", 'content': {
287 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
288 'title': content['channelName']['simpleText'],
289 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
290 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
291 }}
292 else:
293 log_unknown_card(card)
294 return None
295
296 def parse_endcard(card):
297 """
298 parses a single endcard into a format that's easier to handle.
299 """
300 card = card.get('endscreenElementRenderer', card) #only sometimes nested
301 ctype = card['style']
302 if ctype == "CHANNEL":
303 return {'type': ctype, 'content': {
304 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
305 'title': card['title']|G.text,
306 'icons': mkthumbs(card['image']['thumbnails']),
307 }}
308 elif ctype == "VIDEO":
309 if not 'endpoint' in card: return None # title == "This video is unavailable."
310 return {'type': ctype, 'content': {
311 'video_id': card['endpoint']['watchEndpoint']['videoId'],
312 'title': card['title']|G.text,
313 'length': card['videoDuration']|G.text, # '12:21'
314 'views': toInt(card['metadata']|G.text),
315 # XXX: no channel name
316 }}
317 elif ctype == "PLAYLIST":
318 return {'type': ctype, 'content': {
319 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
320 'video_id': card['endpoint']['watchEndpoint']['videoId'],
321 'title': card['title']|G.text,
322 'author': delL(card['metadata']|G.text),
323 'n_videos': toInt(card['playlistLength']|G.text),
324 }}
325 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
326 url = clean_url(card['endpoint']['urlEndpoint']['url'])
327 return {'type': "WEBSITE", 'content': {
328 'url': url,
329 'domain': urlparse(url).netloc,
330 'title': card['title']|G.text,
331 'icons': mkthumbs(card['image']['thumbnails']),
332 }}
333 else:
334 log_unknown_card(card)
335 return None
336
337 def videoid_from_thumbnail(content):
338 # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists)
339 return re.match(r"https?://i.ytimg.com/vi/([-_0-9a-zA-Z]{11})|()",
340 listget(listget(content.get('thumbnails',[]),0,{}).get('thumbnails',[]),0,{}).get('url','')
341 ).group(1)
342
343 def parse_channel_items(items, channel_id, author):
344 result = []
345 extra = []
346 for item in items:
347 key = next(iter(item.keys()), None)
348 content = item[key]
349 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
350 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
351 result.append({'type': 'VIDEO', 'content': {
352 'video_id': content['videoId'],
353 'title': content|G('title')|G.text,
354 'author': content|G('bylineText')|G.text or author,
355 'channel_id': (content|G('bylineText')|G('runs')
356 |Select('navigationEndpoint')
357 |G('browseEndpoint')|G('browseId') or channel_id),
358 'length': (content|G('lengthText')|G.text or # topic channel
359 content|G('thumbnailOverlays')
360 |Select('thumbnailOverlayTimeStatusRenderer')
361 |G('text')|G.text),
362 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
363 'views': content|G('viewCountText')|G.text|A.int,
364 'published': content|G('publishedTimeText')|G.text|A(age),
365 }})
366 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
367 result.append({'type': 'PLAYLIST', 'content': {
368 'playlist_id': (content|G('navigationEndpoint')|G('watchEndpoint')|G('playlistId')
369 or content|G('playlistId')),
370 'video_id': (content|G('navigationEndpoint')|G('watchEndpoint')|G('videoId')
371 or videoid_from_thumbnail(content)),
372 'title': content|G('title')|G.text,
373 'author': author,
374 'channel_id': channel_id,
375 'n_videos': (content|G('videoCount')|A.int or # playlistRenderer
376 content|G('videoCountShortText','videoCountText')|G.text|A.int) # grid
377 }})
378 elif key == "showRenderer":
379 result.append({'type': 'PLAYLIST', 'content': {
380 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
381 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
382 'title': content['title']['simpleText'],
383 'author': author,
384 'channel_id': channel_id,
385 'n_videos': None,
386 }})
387 elif key in ["gridShowRenderer"]:
388 result.append({'type': 'PLAYLIST', 'content': {
389 'playlist_id': (content|G('navigationEndpoint')
390 |G('browseEndpoint')|G('browseId'))[2:],
391 #^: playlistId prefixed with 'VL', which must be removed
392 'video_id': None,
393 'title': content|G('title')|G.text,
394 'author': author,
395 'channel_id': channel_id,
396 'n_videos': content|G('thumbnailOverlays')|G(0)
397 |G('thumbnailOverlayBottomPanelRenderer')|G('text')|G.text,
398 }})
399 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer", "horizontalListRenderer"]:
400 newkey = {
401 "itemSectionRenderer": 'contents',
402 "gridRenderer": 'items',
403 "horizontalCardListRenderer": 'cards',
404 "horizontalListRenderer": 'items',
405 }.get(key)
406 r, e = parse_channel_items(content[newkey], channel_id, author)
407 result.extend(r)
408 extra.extend(e)
409 elif key == "shelfRenderer":
410 r, e = parse_channel_items([content['content']], channel_id, author)
411 result.extend(r)
412 extra.extend(e)
413 elif key == "messageRenderer":
414 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
415 pass
416 elif key == "gameCardRenderer":
417 pass
418 elif key == "gridChannelRenderer":
419 pass # don't care; related channels, e.g. on UCMsgXPD3wzzt8RxHJmXH7hQ
420 else:
421 log_unknown_card(item)
422
423 return result, extra
424
425 def parse_playlist(item):
426 key = next(iter(item.keys()), None)
427 content = item[key]
428 if key == "playlistVideoRenderer":
429 if not content.get('isPlayable', False):
430 return None # private or deleted video
431
432 return {'type': 'VIDEO', 'content': {
433 'video_id': content['videoId'],
434 'title': (content['title'].get('simpleText') or # playable videos
435 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
436 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
437 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
438 # rest is missing from unplayable videos:
439 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
440 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
441 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
442 int(content.get("lengthSeconds", 0))), # "531"
443 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
444 }}
445 else:
446 raise Exception(item) # XXX TODO
Imprint / Impressum