]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
implement 'pipe' framework and test on VIDEO searchresults
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4 import re
5
6 def findall(obj, key):
7 """
8 given a list of dicts, where one dict contains a given key, return said key.
9 """
10 if obj is None: return []
11 return [ obj[key] for obj in obj if key in obj.keys() ]
12 def listget(obj, index, fallback=None):
13 if obj is None: return fallback
14 return next(iter(obj[index:]), fallback)
15 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
16 first = lambda l: next(iter(l),{})
17 listfind = lambda obj,key: first(findall(obj,key))
18
19 class G:
20 """
21 null-coalescing version of dict.get() that also works on lists.
22
23 the | operator is overloaded to achieve similar looking code to jq(1) filters.
24 """
25 def __init__(self, key):
26 self.key = key
27 def __ror__(self, other):
28 try: return other[self.key]
29 except: return None
30 class _Text:
31 """ parses youtube's .runs[].text and .simpleText variants """
32 def __ror__(self, other): # Note: only returning runs[0], not concat'ing all!
33 return other|G('simpleText') or other|G('runs')|G(0)|G('text')
34 text = _Text()
35 class Select:
36 """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """
37 def __init__(self, key=None, *, all=None):
38 self.key = key or all
39 self.all = all
40 def __ror__(self, other):
41 try: items = [ other[self.key] for other in other if self.key in other.keys() ]
42 except: items = []
43 return items if self.all else items|G(0)
44 class A:
45 """ apply """
46 def __init__(self, f, *args):
47 self.f = f
48 self.args = args
49 def __ror__(self, other):
50 return self.f(other, *self.args)
51 class _Int:
52 def __ror__(self, other):
53 try: return int(''.join(filter(str.isdigit, other)))
54 except: return None
55 int = _Int()
56
57
58 def prepare_searchresults(yt_results):
59 contents = listfind(yt_results, 'response') \
60 .get('contents',{})\
61 .get('twoColumnSearchResultsRenderer',{})\
62 .get('primaryContents',{})\
63 .get('sectionListRenderer',{})\
64 .get('contents',[])
65 contents = flatten([c.get('contents',[]) for c in contents|Select(all='itemSectionRenderer')])
66
67 return parse_result_items(contents)
68
69 def prepare_infocards(metadata):
70 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
71 return list(filter(None, map(parse_infocard, cards)))
72
73 def prepare_endcards(metadata):
74 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
75 return list(filter(None, map(parse_endcard, endsc)))
76
77 def prepare_channel(result, channel_id):
78 response = listfind(result,'response')
79
80 if 'alerts' in response: # possibly got an error back
81 from flask import current_app
82 current_app.logger.error([(alert['alertRenderer']['type'],alert['alertRenderer']['text']['simpleText']) for alert in response['alerts']])
83 return None,None,[],[],False
84
85 meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{})
86 meta2 = response.get('microformat',{}).get('microformatDataRenderer',{})
87 title = meta1.get('title', meta2.get('title'))
88 descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars
89 thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
90
91 contents = response.get('continuationContents')
92 if not contents: # overran end of list
93 return title, descr, thumb, [], False
94
95 unparsed = contents.get('gridContinuation',{}).get('items') or \
96 contents.get('sectionListContinuation',{}).get('contents') or []
97 items, extra = parse_channel_items(unparsed, channel_id, title)
98 has_more = 'continuations' in (contents.get('gridContinuation') or
99 contents.get('sectionListContinuation') or {})
100
101 return title, descr, thumb, items, has_more
102
103 def prepare_playlist(result):
104 contents = listfind(result,'response')['continuationContents']
105 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
106 has_more = 'continuations' in contents.get('playlistVideoListContinuation')
107
108 return list(filter(None, map(parse_playlist, unparsed))), has_more
109
110 def mkthumbs(thumbs):
111 output = {str(e['height']): e['url'] for e in thumbs}
112 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
113 return {**output, 'largest': largest}
114
115 def clean_url(url):
116 # externals URLs are redirected through youtube.com/redirect, but we
117 # may encounter internal URLs, too
118 return parse_qs(urlparse(url).query).get('q',[url])[0]
119
120 def toInt(s, fallback=0):
121 if s is None:
122 return fallback
123 try:
124 return int(''.join(filter(str.isdigit, s)))
125 except ValueError:
126 return fallback
127
128 # Remove left-/rightmost word from string:
129 delL = lambda s: s.partition(' ')[2]
130
131 def age(s):
132 if s is None: # missing from autogen'd music, some livestreams
133 return None
134 # Some livestreams have "Streamed 7 hours ago"
135 s = s.replace("Streamed ","")
136 # Now, everything should be in the form "1 year ago"
137 value, unit, _ = s.split(" ")
138 suffix = dict(
139 month='mn',
140 months='mn',
141 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
142
143 return f"{value}{suffix}"
144
145 def log_unknown_card(data):
146 import json
147 try:
148 from flask import request
149 source = request.url
150 except: source = "unknown"
151 with open("/tmp/innertube.err", "a") as f:
152 f.write(f"\n/***** {source} *****/\n")
153 json.dump(data, f, indent=2)
154
155 def parse_result_items(items):
156 # TODO: use .get() for most non-essential attributes
157 """
158 parses youtube search response into an easier to use format.
159 """
160 results = []
161 extras = []
162 for item in items:
163 key = next(iter(item.keys()), None)
164 content = item[key]
165 if key == 'videoRenderer':
166 results.append({'type': 'VIDEO', 'content': {
167 'video_id': content['videoId'],
168 'title': content['title']|G.text,
169 'author': content|G('longBylineText')|G.text or \
170 content|G('shortBylineText')|G.text,
171 'channel_id': content|G('ownerText')|G('runs')|G(0) \
172 |G('navigationEndpoint')|G('browseEndpoint')|G('browseId'),
173 'length': content|G('lengthText')|G.text, # "44:07", "1:41:50"
174 'views': content|G('viewCountText')|G.text|A.int or 0, # "1,234 {views|watching}", absent on 0 views
175 'published': content|G('publishedTimeText')|G('simpleText')|A(age),
176 'live': content|G('badges')|Select('metadataBadgeRenderer')|G('style')=='BADGE_STYLE_TYPE_LIVE_NOW',
177 }})
178 elif key == 'playlistRenderer':
179 results.append({'type': 'PLAYLIST', 'content': {
180 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or \
181 content.get('playlistId'), # COURSE/"learning playlist"
182 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId') or \
183 videoid_from_thumbnail(content), # learning playlist
184 'title': content['title']['simpleText'],
185 # Note: learning playlists have no author/channel_id
186 'author': listget(content.get('longBylineText',{}).get('runs',[]),0,{}).get('text') or
187 listget(content.get('shortBylineText',{}).get('runs',[]),0,{}).get('text'),
188 'channel_id': listget(content.get('longBylineText',{}).get('runs',[]),0,{}) \
189 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'), # OR .shortBylineText
190 'n_videos': toInt(content['videoCount']),
191 }})
192 elif key == 'radioRenderer': # "Mix" playlists
193 results.append({'type': 'PLAYLIST', 'content': {
194 'playlist_id': content['playlistId'],
195 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
196 'title': content['title']['simpleText'],
197 'author': content['longBylineText']['simpleText'] or \
198 content['shortBylineText']['simpleText'] , # always "YouTube"
199 'channel_id': None,
200 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \
201 content['videoCountText']['runs'][0]['text'],
202 # videoCountShortText: "50+"; videoCountText: "50+ videos"
203 }})
204 elif key == 'showRenderer': # normal playlist, specially displayed
205 results.append({'type': 'PLAYLIST', 'content': {
206 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
207 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
208 'title': content['title']['simpleText'],
209 'author': content['longBylineText']['runs'][0]['text'] or \
210 content['shortBylineText']['runs'][0]['text'],
211 'channel_id': None,
212 'n_videos': None,
213 }})
214 elif key == 'channelRenderer':
215 results.append({'type': 'CHANNEL', 'content': {
216 'channel_id': content['channelId'],
217 'title': content['title']['simpleText'],
218 'icons': mkthumbs(content['thumbnail']['thumbnails']),
219 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
220 }})
221 elif key == 'shelfRenderer':
222 subkey = next(iter(content['content'].keys()), {}) #verticalListRenderer/horizontalMovieListRenderer
223 r, e = parse_result_items(content['content'][subkey]['items'])
224 results.extend(r)
225 extras.extend(e)
226 elif key in ['movieRenderer', 'gridMovieRenderer']: # movies to buy/rent
227 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
228 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
229 pass
230 elif key == 'horizontalCardListRenderer':
231 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
232 pass
233 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
234 pass
235 elif key in ['clarificationRenderer', 'infoPanelContainerRenderer']: # COVID-19/conspiracy theory infos
236 pass
237 elif key == 'webAnswerRenderer': # "Result from the web"
238 pass
239 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
240 extras.append({
241 'type': 'spelling',
242 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
243 'autocorrected': key == 'showingResultsForRenderer',
244 })
245 elif key == 'messageRenderer': # "No more results"
246 extras.append({
247 'type': 'message',
248 'message': content|G('title')|G('runs')|G(0)|G('text') or \
249 content|G('text')|G('runs')|G(0)|G('text'),
250 })
251 elif key == 'backgroundPromoRenderer': # e.g. "no results"
252 extras.append({
253 'type': content['icon']['iconType'],
254 'message': content['title']['runs'][0]['text'],
255 })
256 else:
257 log_unknown_card(item)
258 return results, extras
259
260 def parse_infocard(card):
261 """
262 parses a single infocard into a format that's easier to handle.
263 """
264 card = card['cardRenderer']
265 ctype = list(card['content'].keys())[0]
266 content = card['content'][ctype]
267 if ctype == "pollRenderer":
268 return {'type': "POLL", 'content': {
269 'question': content['question']['simpleText'],
270 'answers': [(a['text']['simpleText'],a['numVotes']) \
271 for a in content['choices']],
272 }}
273 elif ctype == "videoInfoCardContentRenderer":
274 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
275 return {'type': "VIDEO", 'content': {
276 'video_id': content['action']['watchEndpoint']['videoId'],
277 'title': content['videoTitle']['simpleText'],
278 'author': delL(content['channelName']['simpleText']),
279 'length': content.get('lengthString',{}).get('simpleText') \
280 if not is_live else "LIVE", # "23:03"
281 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
282 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
283 }}
284 elif ctype == "playlistInfoCardContentRenderer":
285 return {'type': "PLAYLIST", 'content': {
286 'playlist_id': content['action']['watchEndpoint']['playlistId'],
287 'video_id': content['action']['watchEndpoint']['videoId'],
288 'title': content['playlistTitle']['simpleText'],
289 'author': delL(content['channelName']['simpleText']),
290 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
291 }}
292 elif ctype == "simpleCardContentRenderer" and \
293 'urlEndpoint' in content['command']:
294 return {'type': "WEBSITE", 'content': {
295 'url': clean_url(content['command']['urlEndpoint']['url']),
296 'domain': content['displayDomain']['simpleText'],
297 'title': content['title']['simpleText'],
298 # XXX: no thumbnails for infocards
299 }}
300 elif ctype == "collaboratorInfoCardContentRenderer":
301 return {'type': "CHANNEL", 'content': {
302 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
303 'title': content['channelName']['simpleText'],
304 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
305 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
306 }}
307 else:
308 log_unknown_card(card)
309 return None
310
311 def parse_endcard(card):
312 """
313 parses a single endcard into a format that's easier to handle.
314 """
315 card = card.get('endscreenElementRenderer', card) #only sometimes nested
316 ctype = card['style']
317 if ctype == "CHANNEL":
318 return {'type': ctype, 'content': {
319 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
320 'title': card['title']['simpleText'],
321 'icons': mkthumbs(card['image']['thumbnails']),
322 }}
323 elif ctype == "VIDEO":
324 return {'type': ctype, 'content': {
325 'video_id': card['endpoint']['watchEndpoint']['videoId'],
326 'title': card['title']['simpleText'],
327 'length': card['videoDuration']['simpleText'], # '12:21'
328 'views': toInt(card['metadata']['simpleText']),
329 # XXX: no channel name
330 }}
331 elif ctype == "PLAYLIST":
332 return {'type': ctype, 'content': {
333 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
334 'video_id': card['endpoint']['watchEndpoint']['videoId'],
335 'title': card['title']['simpleText'],
336 'author': delL(card['metadata']['simpleText']),
337 'n_videos': toInt(card['playlistLength']['simpleText']),
338 }}
339 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
340 url = clean_url(card['endpoint']['urlEndpoint']['url'])
341 return {'type': "WEBSITE", 'content': {
342 'url': url,
343 'domain': urlparse(url).netloc,
344 'title': card['title']['simpleText'],
345 'icons': mkthumbs(card['image']['thumbnails']),
346 }}
347 else:
348 log_unknown_card(card)
349 return None
350
351 def videoid_from_thumbnail(content):
352 # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists)
353 return re.match(r"https?://i.ytimg.com/vi/([-_0-9a-zA-Z]{11})|()",
354 listget(listget(content.get('thumbnails',[]),0,{}).get('thumbnails',[]),0,{}).get('url','')
355 ).group(1)
356
357 def parse_channel_items(items, channel_id, author):
358 result = []
359 extra = []
360 for item in items:
361 key = next(iter(item.keys()), None)
362 content = item[key]
363 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
364 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
365 result.append({'type': 'VIDEO', 'content': {
366 'video_id': content['videoId'],
367 'title': content['title'].get('simpleText') or content['title'].get('runs',[{}])[0].get('text'),
368 'author': content.get('bylineText',{}).get('runs',[{}])[0].get('text') or author,
369 'channel_id': content.get('bylineText',{}).get('runs',[{}])[0] \
370 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id,
371 'length': (content.get('lengthText',{}).get('simpleText') or # topic channel
372 listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer')
373 .get('text',{}).get('simpleText')),
374 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
375 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
376 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
377 }})
378 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
379 result.append({'type': 'PLAYLIST', 'content': {
380 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or content.get('playlistId'),
381 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId',{}) or videoid_from_thumbnail(content),
382 'title': (content['title'].get('simpleText') or # playlistRenderer
383 content['title']['runs'][0]['text']), # gridPlaylistRenderer
384 'author': author,
385 'channel_id': channel_id,
386 'n_videos': toInt(content.get('videoCount') or # playlistRenderer
387 content.get('videoCountShortText',{}).get('simpleText') or # grid(1)
388 content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
389 }})
390 elif key == "showRenderer":
391 result.append({'type': 'PLAYLIST', 'content': {
392 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
393 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
394 'title': content['title']['simpleText'],
395 'author': author,
396 'channel_id': channel_id,
397 'n_videos': None,
398 }})
399 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer"]:
400 newkey = {
401 "itemSectionRenderer": 'contents',
402 "gridRenderer": 'items',
403 "horizontalCardListRenderer": 'cards',
404 }.get(key)
405 r, e = parse_channel_items(content[newkey], channel_id, author)
406 result.extend(r)
407 extra.extend(e)
408 elif key == "shelfRenderer":
409 r, e = parse_channel_items([content['content']], channel_id, author)
410 result.extend(r)
411 extra.extend(e)
412 elif key == "messageRenderer":
413 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
414 pass
415 elif key == "gameCardRenderer":
416 pass
417 else:
418 log_unknown_card(item)
419
420 return result, extra
421
422 def parse_playlist(item):
423 key = next(iter(item.keys()), None)
424 content = item[key]
425 if key == "playlistVideoRenderer":
426 if not content.get('isPlayable', False):
427 return None # private or deleted video
428
429 return {'type': 'VIDEO', 'content': {
430 'video_id': content['videoId'],
431 'title': (content['title'].get('simpleText') or # playable videos
432 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
433 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
434 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
435 # rest is missing from unplayable videos:
436 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
437 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
438 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
439 int(content.get("lengthSeconds", 0))), # "531"
440 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
441 }}
442 else:
443 raise Exception(item) # XXX TODO
Imprint / Impressum