]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
make |G() take multiple keys
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4 import re
5
6 def findall(obj, key):
7 """
8 given a list of dicts, where one dict contains a given key, return said key.
9 """
10 if obj is None: return []
11 return [ obj[key] for obj in obj if key in obj.keys() ]
12 def listget(obj, index, fallback=None):
13 if obj is None: return fallback
14 return next(iter(obj[index:]), fallback)
15 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
16 first = lambda l: next(iter(l),{})
17 listfind = lambda obj,key: first(findall(obj,key))
18
19 class G:
20 """
21 null-coalescing version of dict.get() that also works on lists.
22
23 the | operator is overloaded to achieve similar looking code to jq(1) filters.
24 the first found key is used: dict(foo=1)|G('bar','foo') returns 1.
25 """
26 def __init__(self, *keys):
27 self.keys = keys
28 def __ror__(self, other):
29 for key in self.keys:
30 try: return other[key]
31 except: continue
32 return None
33 class _Text:
34 """ parses youtube's .runs[].text and .simpleText variants """
35 def __ror__(self, other): # Note: only returning runs[0], not concat'ing all!
36 return other|G('simpleText') or other|G('runs')|G(0)|G('text')
37 text = _Text()
38 class Select:
39 """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """
40 def __init__(self, key=None, *, all=None):
41 self.key = key or all
42 self.all = all
43 def __ror__(self, other):
44 try: items = [ other[self.key] for other in other if self.key in other.keys() ]
45 except: items = []
46 return items if self.all else items|G(0)
47 class A:
48 """ apply """
49 def __init__(self, f, *args):
50 self.f = f
51 self.args = args
52 def __ror__(self, other):
53 return self.f(other, *self.args)
54 class _Int:
55 def __ror__(self, other):
56 try: return int(''.join(filter(str.isdigit, other)))
57 except: return None
58 int = _Int()
59
60
61 def prepare_searchresults(yt_results):
62 contents = listfind(yt_results, 'response') \
63 .get('contents',{})\
64 .get('twoColumnSearchResultsRenderer',{})\
65 .get('primaryContents',{})\
66 .get('sectionListRenderer',{})\
67 .get('contents',[])
68 contents = flatten([c.get('contents',[]) for c in contents|Select(all='itemSectionRenderer')])
69
70 return parse_result_items(contents)
71
72 def prepare_infocards(metadata):
73 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
74 return list(filter(None, map(parse_infocard, cards)))
75
76 def prepare_endcards(metadata):
77 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
78 return list(filter(None, map(parse_endcard, endsc)))
79
80 def prepare_channel(result, channel_id):
81 response = listfind(result,'response')
82
83 if 'alerts' in response: # possibly got an error back
84 from flask import current_app
85 current_app.logger.error([(alert['alertRenderer']['type'],alert['alertRenderer']['text']['simpleText']) for alert in response['alerts']])
86 return None,None,[],[],False
87
88 meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{})
89 meta2 = response.get('microformat',{}).get('microformatDataRenderer',{})
90 title = meta1.get('title', meta2.get('title'))
91 descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars
92 thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
93
94 contents = response.get('continuationContents')
95 if not contents: # overran end of list
96 return title, descr, thumb, [], False
97
98 unparsed = contents.get('gridContinuation',{}).get('items') or \
99 contents.get('sectionListContinuation',{}).get('contents') or []
100 items, extra = parse_channel_items(unparsed, channel_id, title)
101 has_more = 'continuations' in (contents.get('gridContinuation') or
102 contents.get('sectionListContinuation') or {})
103
104 return title, descr, thumb, items, has_more
105
106 def prepare_playlist(result):
107 contents = listfind(result,'response')['continuationContents']
108 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
109 has_more = 'continuations' in contents.get('playlistVideoListContinuation')
110
111 return list(filter(None, map(parse_playlist, unparsed))), has_more
112
113 def mkthumbs(thumbs):
114 output = {str(e['height']): e['url'] for e in thumbs}
115 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
116 return {**output, 'largest': largest}
117
118 def clean_url(url):
119 # externals URLs are redirected through youtube.com/redirect, but we
120 # may encounter internal URLs, too
121 return parse_qs(urlparse(url).query).get('q',[url])[0]
122
123 def toInt(s, fallback=0):
124 if s is None:
125 return fallback
126 try:
127 return int(''.join(filter(str.isdigit, s)))
128 except ValueError:
129 return fallback
130
131 # Remove left-/rightmost word from string:
132 delL = lambda s: s.partition(' ')[2]
133
134 def age(s):
135 if s is None: # missing from autogen'd music, some livestreams
136 return None
137 # Some livestreams have "Streamed 7 hours ago"
138 s = s.replace("Streamed ","")
139 # Now, everything should be in the form "1 year ago"
140 value, unit, _ = s.split(" ")
141 suffix = dict(
142 month='mn',
143 months='mn',
144 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
145
146 return f"{value}{suffix}"
147
148 def log_unknown_card(data):
149 import json
150 try:
151 from flask import request
152 source = request.url
153 except: source = "unknown"
154 with open("/tmp/innertube.err", "a") as f:
155 f.write(f"\n/***** {source} *****/\n")
156 json.dump(data, f, indent=2)
157
158 def parse_result_items(items):
159 # TODO: use .get() for most non-essential attributes
160 """
161 parses youtube search response into an easier to use format.
162 """
163 results = []
164 extras = []
165 for item in items:
166 key = next(iter(item.keys()), None)
167 content = item[key]
168 if key == 'videoRenderer':
169 results.append({'type': 'VIDEO', 'content': {
170 'video_id': content['videoId'],
171 'title': content['title']|G.text,
172 'author': content|G('longBylineText','shortBylineText')|G.text,
173 'channel_id': content|G('ownerText')|G('runs')|G(0) \
174 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
175 'length': content|G('lengthText')|G.text, # "44:07", "1:41:50"
176 'views': content|G('viewCountText')|G.text|A.int or 0, # "1,234 {views|watching}", absent on 0 views
177 'published': content|G('publishedTimeText')|G('simpleText')|A(age),
178 'live': content|G('badges')|Select('metadataBadgeRenderer')|G('style')=='BADGE_STYLE_TYPE_LIVE_NOW',
179 }})
180 elif key == 'playlistRenderer':
181 results.append({'type': 'PLAYLIST', 'content': {
182 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or \
183 content.get('playlistId'), # COURSE/"learning playlist"
184 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId') or \
185 videoid_from_thumbnail(content), # learning playlist
186 'title': content['title']['simpleText'],
187 # Note: learning playlists have no author/channel_id
188 'author': listget(content.get('longBylineText',{}).get('runs',[]),0,{}).get('text') or
189 listget(content.get('shortBylineText',{}).get('runs',[]),0,{}).get('text'),
190 'channel_id': listget(content.get('longBylineText',{}).get('runs',[]),0,{}) \
191 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'), # OR .shortBylineText
192 'n_videos': toInt(content['videoCount']),
193 }})
194 elif key == 'radioRenderer': # "Mix" playlists
195 results.append({'type': 'PLAYLIST', 'content': {
196 'playlist_id': content['playlistId'],
197 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
198 'title': content['title']['simpleText'],
199 'author': content['longBylineText']['simpleText'] or \
200 content['shortBylineText']['simpleText'] , # always "YouTube"
201 'channel_id': None,
202 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \
203 content['videoCountText']['runs'][0]['text'],
204 # videoCountShortText: "50+"; videoCountText: "50+ videos"
205 }})
206 elif key == 'showRenderer': # normal playlist, specially displayed
207 results.append({'type': 'PLAYLIST', 'content': {
208 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
209 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
210 'title': content['title']['simpleText'],
211 'author': content['longBylineText']['runs'][0]['text'] or \
212 content['shortBylineText']['runs'][0]['text'],
213 'channel_id': None,
214 'n_videos': None,
215 }})
216 elif key == 'channelRenderer':
217 results.append({'type': 'CHANNEL', 'content': {
218 'channel_id': content['channelId'],
219 'title': content['title']['simpleText'],
220 'icons': mkthumbs(content['thumbnail']['thumbnails']),
221 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
222 }})
223 elif key == 'shelfRenderer':
224 subkey = next(iter(content['content'].keys()), {}) #verticalListRenderer/horizontalMovieListRenderer
225 r, e = parse_result_items(content['content'][subkey]['items'])
226 results.extend(r)
227 extras.extend(e)
228 elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent
229 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
230 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
231 pass
232 elif key == 'horizontalCardListRenderer':
233 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
234 pass
235 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
236 pass
237 elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos
238 pass
239 elif key == 'webAnswerRenderer': # "Result from the web"
240 pass
241 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
242 extras.append({
243 'type': 'spelling',
244 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
245 'autocorrected': key == 'showingResultsForRenderer',
246 })
247 elif key == 'messageRenderer': # "No more results"
248 extras.append({
249 'type': 'message',
250 'message': content|G('title')|G('runs')|G(0)|G('text') or \
251 content|G('text')|G('runs')|G(0)|G('text'),
252 })
253 elif key == 'backgroundPromoRenderer': # e.g. "no results"
254 extras.append({
255 'type': content['icon']['iconType'],
256 'message': content['title']['runs'][0]['text'],
257 })
258 else:
259 log_unknown_card(item)
260 return results, extras
261
262 def parse_infocard(card):
263 """
264 parses a single infocard into a format that's easier to handle.
265 """
266 card = card['cardRenderer']
267 ctype = list(card['content'].keys())[0]
268 content = card['content'][ctype]
269 if ctype == "pollRenderer":
270 return {'type': "POLL", 'content': {
271 'question': content['question']['simpleText'],
272 'answers': [(a['text']['simpleText'],a['numVotes']) \
273 for a in content['choices']],
274 }}
275 elif ctype == "videoInfoCardContentRenderer":
276 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
277 return {'type': "VIDEO", 'content': {
278 'video_id': content['action']['watchEndpoint']['videoId'],
279 'title': content['videoTitle']['simpleText'],
280 'author': delL(content['channelName']['simpleText']),
281 'length': content.get('lengthString',{}).get('simpleText') \
282 if not is_live else "LIVE", # "23:03"
283 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
284 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
285 }}
286 elif ctype == "playlistInfoCardContentRenderer":
287 return {'type': "PLAYLIST", 'content': {
288 'playlist_id': content['action']['watchEndpoint']['playlistId'],
289 'video_id': content['action']['watchEndpoint']['videoId'],
290 'title': content['playlistTitle']['simpleText'],
291 'author': delL(content['channelName']['simpleText']),
292 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
293 }}
294 elif ctype == "simpleCardContentRenderer" and \
295 'urlEndpoint' in content['command']:
296 return {'type': "WEBSITE", 'content': {
297 'url': clean_url(content['command']['urlEndpoint']['url']),
298 'domain': content['displayDomain']['simpleText'],
299 'title': content['title']['simpleText'],
300 # XXX: no thumbnails for infocards
301 }}
302 elif ctype == "collaboratorInfoCardContentRenderer":
303 return {'type': "CHANNEL", 'content': {
304 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
305 'title': content['channelName']['simpleText'],
306 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
307 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
308 }}
309 else:
310 log_unknown_card(card)
311 return None
312
313 def parse_endcard(card):
314 """
315 parses a single endcard into a format that's easier to handle.
316 """
317 card = card.get('endscreenElementRenderer', card) #only sometimes nested
318 ctype = card['style']
319 if ctype == "CHANNEL":
320 return {'type': ctype, 'content': {
321 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
322 'title': card['title']['simpleText'],
323 'icons': mkthumbs(card['image']['thumbnails']),
324 }}
325 elif ctype == "VIDEO":
326 return {'type': ctype, 'content': {
327 'video_id': card['endpoint']['watchEndpoint']['videoId'],
328 'title': card['title']['simpleText'],
329 'length': card['videoDuration']['simpleText'], # '12:21'
330 'views': toInt(card['metadata']['simpleText']),
331 # XXX: no channel name
332 }}
333 elif ctype == "PLAYLIST":
334 return {'type': ctype, 'content': {
335 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
336 'video_id': card['endpoint']['watchEndpoint']['videoId'],
337 'title': card['title']['simpleText'],
338 'author': delL(card['metadata']['simpleText']),
339 'n_videos': toInt(card['playlistLength']['simpleText']),
340 }}
341 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
342 url = clean_url(card['endpoint']['urlEndpoint']['url'])
343 return {'type': "WEBSITE", 'content': {
344 'url': url,
345 'domain': urlparse(url).netloc,
346 'title': card['title']['simpleText'],
347 'icons': mkthumbs(card['image']['thumbnails']),
348 }}
349 else:
350 log_unknown_card(card)
351 return None
352
353 def videoid_from_thumbnail(content):
354 # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists)
355 return re.match(r"https?://i.ytimg.com/vi/([-_0-9a-zA-Z]{11})|()",
356 listget(listget(content.get('thumbnails',[]),0,{}).get('thumbnails',[]),0,{}).get('url','')
357 ).group(1)
358
359 def parse_channel_items(items, channel_id, author):
360 result = []
361 extra = []
362 for item in items:
363 key = next(iter(item.keys()), None)
364 content = item[key]
365 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
366 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
367 result.append({'type': 'VIDEO', 'content': {
368 'video_id': content['videoId'],
369 'title': content['title'].get('simpleText') or content['title'].get('runs',[{}])[0].get('text'),
370 'author': content.get('bylineText',{}).get('runs',[{}])[0].get('text') or author,
371 'channel_id': content.get('bylineText',{}).get('runs',[{}])[0] \
372 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id,
373 'length': (content.get('lengthText',{}).get('simpleText') or # topic channel
374 listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer')
375 .get('text',{}).get('simpleText')),
376 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
377 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
378 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
379 }})
380 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
381 result.append({'type': 'PLAYLIST', 'content': {
382 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or content.get('playlistId'),
383 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId',{}) or videoid_from_thumbnail(content),
384 'title': (content['title'].get('simpleText') or # playlistRenderer
385 content['title']['runs'][0]['text']), # gridPlaylistRenderer
386 'author': author,
387 'channel_id': channel_id,
388 'n_videos': toInt(content.get('videoCount') or # playlistRenderer
389 content.get('videoCountShortText',{}).get('simpleText') or # grid(1)
390 content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
391 }})
392 elif key == "showRenderer":
393 result.append({'type': 'PLAYLIST', 'content': {
394 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
395 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
396 'title': content['title']['simpleText'],
397 'author': author,
398 'channel_id': channel_id,
399 'n_videos': None,
400 }})
401 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer"]:
402 newkey = {
403 "itemSectionRenderer": 'contents',
404 "gridRenderer": 'items',
405 "horizontalCardListRenderer": 'cards',
406 }.get(key)
407 r, e = parse_channel_items(content[newkey], channel_id, author)
408 result.extend(r)
409 extra.extend(e)
410 elif key == "shelfRenderer":
411 r, e = parse_channel_items([content['content']], channel_id, author)
412 result.extend(r)
413 extra.extend(e)
414 elif key == "messageRenderer":
415 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
416 pass
417 elif key == "gameCardRenderer":
418 pass
419 else:
420 log_unknown_card(item)
421
422 return result, extra
423
424 def parse_playlist(item):
425 key = next(iter(item.keys()), None)
426 content = item[key]
427 if key == "playlistVideoRenderer":
428 if not content.get('isPlayable', False):
429 return None # private or deleted video
430
431 return {'type': 'VIDEO', 'content': {
432 'video_id': content['videoId'],
433 'title': (content['title'].get('simpleText') or # playable videos
434 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
435 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
436 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
437 # rest is missing from unplayable videos:
438 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
439 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
440 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
441 int(content.get("lengthSeconds", 0))), # "531"
442 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
443 }}
444 else:
445 raise Exception(item) # XXX TODO
Imprint / Impressum