]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
fix "learning playlist" and playlists in general, more search result types
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4
5 def findall(obj, key):
6 """
7 given a list of dicts, where one dict contains a given key, return said key.
8 """
9 if obj is None: return []
10 return [ obj[key] for obj in obj if key in obj.keys() ]
11 def listget(obj, index, fallback=None):
12 if obj is None: return fallback
13 return next(iter(obj[index:]), fallback)
14 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
15 first = lambda l: next(iter(l),{})
16 listfind = lambda obj,key: first(findall(obj,key))
17
18 def prepare_searchresults(yt_results):
19 contents = listfind(yt_results, 'response') \
20 .get('contents',{})\
21 .get('twoColumnSearchResultsRenderer',{})\
22 .get('primaryContents',{})\
23 .get('sectionListRenderer',{})\
24 .get('contents',[])
25 contents = flatten([c.get('contents',[]) for c in findall(contents, 'itemSectionRenderer')])
26
27 return parse_result_items(contents)
28
29 def prepare_infocards(metadata):
30 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
31 return list(filter(None, map(parse_infocard, cards)))
32
33 def prepare_endcards(metadata):
34 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
35 return list(filter(None, map(parse_endcard, endsc)))
36
37 def prepare_channel(result, channel_id):
38 response = listfind(result,'response')
39
40 if 'alerts' in response: # possibly got an error back
41 from flask import current_app
42 current_app.logger.error([(alert['alertRenderer']['type'],alert['alertRenderer']['text']['simpleText']) for alert in response['alerts']])
43 return None,None,[],[],False
44
45 meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{})
46 meta2 = response.get('microformat',{}).get('microformatDataRenderer',{})
47 title = meta1.get('title', meta2.get('title'))
48 descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars
49 thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
50
51 contents = response.get('continuationContents')
52 if not contents: # overran end of list
53 return title, descr, thumb, [], False
54
55 unparsed = contents.get('gridContinuation',{}).get('items') or \
56 contents.get('sectionListContinuation',{}).get('contents') or []
57 items, extra = parse_channel_items(unparsed, channel_id, title)
58 has_more = 'continuations' in (contents.get('gridContinuation') or
59 contents.get('sectionListContinuation') or {})
60
61 return title, descr, thumb, items, has_more
62
63 def prepare_playlist(result):
64 contents = listfind(result,'response')['continuationContents']
65 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
66 has_more = 'continuations' in contents.get('playlistVideoListContinuation')
67
68 return list(filter(None, map(parse_playlist, unparsed))), has_more
69
70 def mkthumbs(thumbs):
71 output = {str(e['height']): e['url'] for e in thumbs}
72 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
73 return {**output, 'largest': largest}
74
75 def clean_url(url):
76 # externals URLs are redirected through youtube.com/redirect, but we
77 # may encounter internal URLs, too
78 return parse_qs(urlparse(url).query).get('q',[url])[0]
79
80 def toInt(s, fallback=0):
81 if s is None:
82 return fallback
83 try:
84 return int(''.join(filter(str.isdigit, s)))
85 except ValueError:
86 return fallback
87
88 # Remove left-/rightmost word from string:
89 delL = lambda s: s.partition(' ')[2]
90
91 def age(s):
92 if s is None: # missing from autogen'd music, some livestreams
93 return None
94 # Some livestreams have "Streamed 7 hours ago"
95 s = s.replace("Streamed ","")
96 # Now, everything should be in the form "1 year ago"
97 value, unit, _ = s.split(" ")
98 suffix = dict(
99 month='mn',
100 months='mn',
101 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
102
103 return f"{value}{suffix}"
104
105 def log_unknown_card(data):
106 import json
107 try:
108 from flask import request
109 source = request.url
110 except: source = "unknown"
111 with open("/tmp/innertube.err", "a") as f:
112 f.write(f"\n/***** {source} *****/\n")
113 json.dump(data, f, indent=2)
114
115 def parse_result_items(items):
116 # TODO: use .get() for most non-essential attributes
117 """
118 parses youtube search response into an easier to use format.
119 """
120 results = []
121 extras = []
122 for item in items:
123 key = next(iter(item.keys()), None)
124 content = item[key]
125 if key == 'videoRenderer':
126 is_live = listfind(content.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
127 results.append({'type': 'VIDEO', 'content': {
128 'video_id': content['videoId'],
129 'title': content['title']['runs'][0]['text'],
130 'author': content['longBylineText']['runs'][0]['text'] or \
131 content['shortBylineText']['runs'][0]['text'],
132 'channel_id': content['ownerText']['runs'][0] \
133 ['navigationEndpoint']['browseEndpoint']['browseId'],
134 'length': content.get('lengthText',{}).get('simpleText') \
135 if not is_live else 'LIVE', # "44:07", "1:41:50"
136 'views': toInt(content.get('viewCountText',{}).get('simpleText') or # "123,456 views"
137 listget(content.get('viewCountText',{}).get('runs'),0,{}).get('text')), # "1,234 watching"
138 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
139 }})
140 elif key == 'playlistRenderer':
141 results.append({'type': 'PLAYLIST', 'content': {
142 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
143 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
144 'title': content['title']['simpleText'],
145 'author': content['longBylineText']['runs'][0]['text'] or
146 content['shortBylineText']['runs'][0]['text'],
147 'channel_id': content['longBylineText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR .shortBylineText
148 'n_videos': toInt(content['videoCount']),
149 }})
150 elif key == 'radioRenderer': # "Mix" playlists
151 results.append({'type': 'PLAYLIST', 'content': {
152 'playlist_id': content['playlistId'],
153 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
154 'title': content['title']['simpleText'],
155 'author': content['longBylineText']['simpleText'] or \
156 content['shortBylineText']['simpleText'] , # always "YouTube"
157 'channel_id': None,
158 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \
159 content['videoCountText']['runs'][0]['text'],
160 # videoCountShortText: "50+"; videoCountText: "50+ videos"
161 }})
162 elif key == 'channelRenderer':
163 results.append({'type': 'CHANNEL', 'content': {
164 'channel_id': content['channelId'],
165 'title': content['title']['simpleText'],
166 'icons': mkthumbs(content['thumbnail']['thumbnails']),
167 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
168 }})
169 elif key == 'shelfRenderer':
170 r, e = parse_result_items(content['content']['verticalListRenderer']['items'])
171 results.extend(r)
172 extras.extend(e)
173 elif key == 'movieRenderer': # movies to buy/rent
174 pass
175 elif key in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
176 pass
177 elif key == 'horizontalCardListRenderer':
178 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
179 pass
180 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
181 pass
182 elif key == 'clarificationRenderer': # COVID-19 infos
183 pass
184 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
185 extras.append({
186 'type': 'spelling',
187 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
188 'autocorrected': key == 'showingResultsForRenderer',
189 })
190 elif key == 'backgroundPromoRenderer': # e.g. "no results"
191 extras.append({
192 'type': content['icon']['iconType'],
193 'message': content['title']['runs'][0]['text'],
194 })
195 else:
196 log_unknown_card(item)
197 return results, extras
198
199 def parse_infocard(card):
200 """
201 parses a single infocard into a format that's easier to handle.
202 """
203 card = card['cardRenderer']
204 ctype = list(card['content'].keys())[0]
205 content = card['content'][ctype]
206 if ctype == "pollRenderer":
207 return {'type': "POLL", 'content': {
208 'question': content['question']['simpleText'],
209 'answers': [(a['text']['simpleText'],a['numVotes']) \
210 for a in content['choices']],
211 }}
212 elif ctype == "videoInfoCardContentRenderer":
213 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
214 return {'type': "VIDEO", 'content': {
215 'video_id': content['action']['watchEndpoint']['videoId'],
216 'title': content['videoTitle']['simpleText'],
217 'author': delL(content['channelName']['simpleText']),
218 'length': content.get('lengthString',{}).get('simpleText') \
219 if not is_live else "LIVE", # "23:03"
220 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
221 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
222 }}
223 elif ctype == "playlistInfoCardContentRenderer":
224 return {'type': "PLAYLIST", 'content': {
225 'playlist_id': content['action']['watchEndpoint']['playlistId'],
226 'video_id': content['action']['watchEndpoint']['videoId'],
227 'title': content['playlistTitle']['simpleText'],
228 'author': delL(content['channelName']['simpleText']),
229 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
230 }}
231 elif ctype == "simpleCardContentRenderer" and \
232 'urlEndpoint' in content['command']:
233 return {'type': "WEBSITE", 'content': {
234 'url': clean_url(content['command']['urlEndpoint']['url']),
235 'domain': content['displayDomain']['simpleText'],
236 'title': content['title']['simpleText'],
237 # XXX: no thumbnails for infocards
238 }}
239 elif ctype == "collaboratorInfoCardContentRenderer":
240 return {'type': "CHANNEL", 'content': {
241 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
242 'title': content['channelName']['simpleText'],
243 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
244 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
245 }}
246 else:
247 log_unknown_card(card)
248 return None
249
250 def parse_endcard(card):
251 """
252 parses a single endcard into a format that's easier to handle.
253 """
254 card = card.get('endscreenElementRenderer', card) #only sometimes nested
255 ctype = card['style']
256 if ctype == "CHANNEL":
257 return {'type': ctype, 'content': {
258 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
259 'title': card['title']['simpleText'],
260 'icons': mkthumbs(card['image']['thumbnails']),
261 }}
262 elif ctype == "VIDEO":
263 return {'type': ctype, 'content': {
264 'video_id': card['endpoint']['watchEndpoint']['videoId'],
265 'title': card['title']['simpleText'],
266 'length': card['videoDuration']['simpleText'], # '12:21'
267 'views': toInt(card['metadata']['simpleText']),
268 # XXX: no channel name
269 }}
270 elif ctype == "PLAYLIST":
271 return {'type': ctype, 'content': {
272 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
273 'video_id': card['endpoint']['watchEndpoint']['videoId'],
274 'title': card['title']['simpleText'],
275 'author': delL(card['metadata']['simpleText']),
276 'n_videos': toInt(card['playlistLength']['simpleText']),
277 }}
278 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
279 url = clean_url(card['endpoint']['urlEndpoint']['url'])
280 return {'type': "WEBSITE", 'content': {
281 'url': url,
282 'domain': urlparse(url).netloc,
283 'title': card['title']['simpleText'],
284 'icons': mkthumbs(card['image']['thumbnails']),
285 }}
286 else:
287 log_unknown_card(card)
288 return None
289
290 def parse_channel_items(items, channel_id, author):
291 result = []
292 extra = []
293 for item in items:
294 key = next(iter(item.keys()), None)
295 content = item[key]
296 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
297 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
298 result.append({'type': 'VIDEO', 'content': {
299 'video_id': content['videoId'],
300 'title': content['title'].get('simpleText') or content['title'].get('runs',[{}])[0].get('text'),
301 'author': content.get('bylineText',{}).get('runs',[{}])[0].get('text') or author,
302 'channel_id': content.get('bylineText',{}).get('runs',[{}])[0] \
303 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id,
304 'length': (content.get('lengthText',{}).get('simpleText') or # topic channel
305 listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer')
306 .get('text',{}).get('simpleText')),
307 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
308 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
309 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
310 }})
311 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
312 result.append({'type': 'PLAYLIST', 'content': {
313 'playlist_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('playlistId') or content.get('playlistId'),
314 'video_id': content['navigationEndpoint'].get('watchEndpoint',{}).get('videoId',{}),
315 'title': (content['title'].get('simpleText') or # playlistRenderer
316 content['title']['runs'][0]['text']), # gridPlaylistRenderer
317 'author': author,
318 'channel_id': channel_id,
319 'n_videos': toInt(content.get('videoCount') or # playlistRenderer
320 content.get('videoCountShortText',{}).get('simpleText') or # grid(1)
321 content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
322 }})
323 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer"]:
324 newkey = {
325 "itemSectionRenderer": 'contents',
326 "gridRenderer": 'items',
327 "horizontalCardListRenderer": 'cards',
328 }.get(key)
329 r, e = parse_channel_items(content[newkey], channel_id, author)
330 result.extend(r)
331 extra.extend(e)
332 elif key == "shelfRenderer":
333 r, e = parse_channel_items([content['content']], channel_id, author)
334 result.extend(r)
335 extra.extend(e)
336 elif key == "messageRenderer":
337 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
338 pass
339 elif key == "gameCardRenderer":
340 pass
341 else:
342 log_unknown_card(item)
343
344 return result, extra
345
346 def parse_playlist(item):
347 key = next(iter(item.keys()), None)
348 content = item[key]
349 if key == "playlistVideoRenderer":
350 if not content.get('isPlayable', False):
351 return None # private or deleted video
352
353 return {'type': 'VIDEO', 'content': {
354 'video_id': content['videoId'],
355 'title': (content['title'].get('simpleText') or # playable videos
356 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
357 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
358 'index': content['navigationEndpoint']['watchEndpoint'].get('index',0), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
359 # rest is missing from unplayable videos:
360 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
361 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
362 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
363 int(content.get("lengthSeconds", 0))), # "531"
364 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
365 }}
366 else:
367 raise Exception(item) # XXX TODO
Imprint / Impressum