]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
more innertube variants
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4
5 def findall(obj, key):
6 """
7 given a list of dicts, where one dict contains a given key, return said key.
8 """
9 if obj is None: return []
10 return [ obj[key] for obj in obj if key in obj.keys() ]
11 def listget(obj, index, fallback=None):
12 return next(iter(obj[index:]), fallback)
13 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
14 first = lambda l: next(iter(l),{})
15 listfind = lambda obj,key: first(findall(obj,key))
16
17 def prepare_searchresults(yt_results):
18 contents = listfind(yt_results, 'response') \
19 .get('contents',{})\
20 .get('twoColumnSearchResultsRenderer',{})\
21 .get('primaryContents',{})\
22 .get('sectionListRenderer',{})\
23 .get('contents',[])
24 contents = flatten([c.get('contents',[]) for c in findall(contents, 'itemSectionRenderer')])
25
26 return parse_result_items(contents)
27
28 def prepare_infocards(metadata):
29 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
30 return list(filter(None, map(parse_infocard, cards)))
31
32 def prepare_endcards(metadata):
33 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
34 return list(filter(None, map(parse_endcard, endsc)))
35
36 def prepare_channel(result, channel_id):
37 response = listfind(result,'response')
38
39 if 'alerts' in response: # possibly got an error back
40 from flask import current_app
41 current_app.logger.error([(alert['alertRenderer']['type'],alert['alertRenderer']['text']['simpleText']) for alert in response['alerts']])
42 return None,None,[],[],False
43
44 meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{})
45 meta2 = response.get('microformat',{}).get('microformatDataRenderer',{})
46 title = meta1.get('title', meta2.get('title'))
47 descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars
48 thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
49
50 contents = response.get('continuationContents')
51 if not contents: # overran end of list
52 return title, descr, thumb, [], False
53
54 unparsed = contents.get('gridContinuation',{}).get('items') or \
55 contents.get('sectionListContinuation',{}).get('contents') or []
56 items, extra = parse_channel_items(unparsed, channel_id, title)
57 has_more = 'continuations' in (contents.get('gridContinuation') or
58 contents.get('sectionListContinuation') or {})
59
60 return title, descr, thumb, items, has_more
61
62 def prepare_playlist(result):
63 contents = listfind(result,'response')['continuationContents']
64 unparsed = contents['playlistVideoListContinuation'].get('contents',[])
65 has_more = 'continuations' in contents.get('playlistVideoListContinuation')
66
67 return list(filter(None, map(parse_playlist, unparsed))), has_more
68
69 def mkthumbs(thumbs):
70 output = {str(e['height']): e['url'] for e in thumbs}
71 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
72 return {**output, 'largest': largest}
73
74 def clean_url(url):
75 # externals URLs are redirected through youtube.com/redirect, but we
76 # may encounter internal URLs, too
77 return parse_qs(urlparse(url).query).get('q',[url])[0]
78
79 def toInt(s, fallback=0):
80 if s is None:
81 return fallback
82 try:
83 return int(''.join(filter(str.isdigit, s)))
84 except ValueError:
85 return fallback
86
87 # Remove left-/rightmost word from string:
88 delL = lambda s: s.partition(' ')[2]
89
90 def age(s):
91 if s is None: # missing from autogen'd music, some livestreams
92 return None
93 # Some livestreams have "Streamed 7 hours ago"
94 s = s.replace("Streamed ","")
95 # Now, everything should be in the form "1 year ago"
96 value, unit, _ = s.split(" ")
97 suffix = dict(
98 month='mn',
99 months='mn',
100 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
101
102 return f"{value}{suffix}"
103
104 def log_unknown_card(data):
105 import json
106 try:
107 from flask import request
108 source = request.url
109 except: source = "unknown"
110 with open("/tmp/innertube.err", "a") as f:
111 f.write(f"\n/***** {source} *****/\n")
112 json.dump(data, f, indent=2)
113
114 def parse_result_items(items):
115 # TODO: use .get() for most non-essential attributes
116 """
117 parses youtube search response into an easier to use format.
118 """
119 results = []
120 extras = []
121 for item in items:
122 key = next(iter(item.keys()), None)
123 content = item[key]
124 if key == 'videoRenderer':
125 is_live = listfind(content.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
126 results.append({'type': 'VIDEO', 'content': {
127 'video_id': content['videoId'],
128 'title': content['title']['runs'][0]['text'],
129 'author': content['longBylineText']['runs'][0]['text'] or \
130 content['shortBylineText']['runs'][0]['text'],
131 'channel_id': content['ownerText']['runs'][0] \
132 ['navigationEndpoint']['browseEndpoint']['browseId'],
133 'length': content.get('lengthText',{}).get('simpleText') \
134 if not is_live else 'LIVE', # "44:07", "1:41:50"
135 'views': toInt(content.get('viewCountText',{}).get('simpleText') or # "123,456 views"
136 listget(content.get('viewCountText',{}).get('runs'),0,{}).get('text')), # "1,234 watching"
137 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
138 }})
139 elif key == 'playlistRenderer':
140 results.append({'type': 'PLAYLIST', 'content': {
141 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
142 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
143 'title': content['title']['simpleText'],
144 'author': content['longBylineText']['runs'][0]['text'] or
145 content['shortBylineText']['runs'][0]['text'],
146 'channel_id': content['longBylineText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR .shortBylineText
147 'n_videos': toInt(content['videoCount']),
148 }})
149 elif key == 'radioRenderer': # "Mix" playlists
150 results.append({'type': 'PLAYLIST', 'content': {
151 'playlist_id': content['playlistId'],
152 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
153 'title': content['title']['simpleText'],
154 'author': content['longBylineText']['simpleText'] or \
155 content['shortBylineText']['simpleText'] , # always "YouTube"
156 'channel_id': None,
157 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \
158 content['videoCountText']['runs'][0]['text'],
159 # videoCountShortText: "50+"; videoCountText: "50+ videos"
160 }})
161 elif key == 'channelRenderer':
162 results.append({'type': 'CHANNEL', 'content': {
163 'channel_id': content['channelId'],
164 'title': content['title']['simpleText'],
165 'icons': mkthumbs(content['thumbnail']['thumbnails']),
166 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
167 }})
168 elif key == 'shelfRenderer':
169 r, e = parse_result_items(content['content']['verticalListRenderer']['items'])
170 results.extend(r)
171 extras.extend(e)
172 elif key == 'movieRenderer': # movies to buy/rent
173 pass
174 elif key == 'carouselAdRenderer' or key == 'searchPyvRenderer': # haha, no.
175 pass
176 elif key == 'horizontalCardListRenderer':
177 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
178 pass
179 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
180 pass
181 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
182 extras.append({
183 'type': 'spelling',
184 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
185 'autocorrected': key == 'showingResultsForRenderer',
186 })
187 else:
188 log_unknown_card(item)
189 return results, extras
190
191 def parse_infocard(card):
192 """
193 parses a single infocard into a format that's easier to handle.
194 """
195 card = card['cardRenderer']
196 ctype = list(card['content'].keys())[0]
197 content = card['content'][ctype]
198 if ctype == "pollRenderer":
199 return {'type': "POLL", 'content': {
200 'question': content['question']['simpleText'],
201 'answers': [(a['text']['simpleText'],a['numVotes']) \
202 for a in content['choices']],
203 }}
204 elif ctype == "videoInfoCardContentRenderer":
205 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
206 return {'type': "VIDEO", 'content': {
207 'video_id': content['action']['watchEndpoint']['videoId'],
208 'title': content['videoTitle']['simpleText'],
209 'author': delL(content['channelName']['simpleText']),
210 'length': content.get('lengthString',{}).get('simpleText') \
211 if not is_live else "LIVE", # "23:03"
212 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
213 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
214 }}
215 elif ctype == "playlistInfoCardContentRenderer":
216 return {'type': "PLAYLIST", 'content': {
217 'playlist_id': content['action']['watchEndpoint']['playlistId'],
218 'video_id': content['action']['watchEndpoint']['videoId'],
219 'title': content['playlistTitle']['simpleText'],
220 'author': delL(content['channelName']['simpleText']),
221 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
222 }}
223 elif ctype == "simpleCardContentRenderer" and \
224 'urlEndpoint' in content['command']:
225 return {'type': "WEBSITE", 'content': {
226 'url': clean_url(content['command']['urlEndpoint']['url']),
227 'domain': content['displayDomain']['simpleText'],
228 'title': content['title']['simpleText'],
229 # XXX: no thumbnails for infocards
230 }}
231 elif ctype == "collaboratorInfoCardContentRenderer":
232 return {'type': "CHANNEL", 'content': {
233 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
234 'title': content['channelName']['simpleText'],
235 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
236 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
237 }}
238 else:
239 log_unknown_card(card)
240 return None
241
242 def parse_endcard(card):
243 """
244 parses a single endcard into a format that's easier to handle.
245 """
246 card = card.get('endscreenElementRenderer', card) #only sometimes nested
247 ctype = card['style']
248 if ctype == "CHANNEL":
249 return {'type': ctype, 'content': {
250 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
251 'title': card['title']['simpleText'],
252 'icons': mkthumbs(card['image']['thumbnails']),
253 }}
254 elif ctype == "VIDEO":
255 return {'type': ctype, 'content': {
256 'video_id': card['endpoint']['watchEndpoint']['videoId'],
257 'title': card['title']['simpleText'],
258 'length': card['videoDuration']['simpleText'], # '12:21'
259 'views': toInt(card['metadata']['simpleText']),
260 # XXX: no channel name
261 }}
262 elif ctype == "PLAYLIST":
263 return {'type': ctype, 'content': {
264 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
265 'video_id': card['endpoint']['watchEndpoint']['videoId'],
266 'title': card['title']['simpleText'],
267 'author': delL(card['metadata']['simpleText']),
268 'n_videos': toInt(card['playlistLength']['simpleText']),
269 }}
270 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
271 url = clean_url(card['endpoint']['urlEndpoint']['url'])
272 return {'type': "WEBSITE", 'content': {
273 'url': url,
274 'domain': urlparse(url).netloc,
275 'title': card['title']['simpleText'],
276 'icons': mkthumbs(card['image']['thumbnails']),
277 }}
278 else:
279 log_unknown_card(card)
280 return None
281
282 def parse_channel_items(items, channel_id, author):
283 result = []
284 extra = []
285 for item in items:
286 key = next(iter(item.keys()), None)
287 content = item[key]
288 if key in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
289 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
290 result.append({'type': 'VIDEO', 'content': {
291 'video_id': content['videoId'],
292 'title': content['title']['simpleText'],
293 'author': content.get('bylineText',{}).get('runs',[{}])[0].get('text') or author,
294 'channel_id': content.get('bylineText',{}).get('runs',[{}])[0] \
295 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id,
296 'length': (content.get('lengthText',{}).get('simpleText') or # topic channel
297 listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer')
298 .get('text',{}).get('simpleText')),
299 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
300 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
301 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
302 }})
303 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
304 result.append({'type': 'PLAYLIST', 'content': {
305 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
306 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
307 'title': (content['title'].get('simpleText') or # playlistRenderer
308 content['title']['runs'][0]['text']), # gridPlaylistRenderer
309 'author': author,
310 'channel_id': channel_id,
311 'n_videos': toInt(content.get('videoCount') or # playlistRenderer
312 content.get('videoCountShortText',{}).get('simpleText') or # grid(1)
313 content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
314 }})
315 elif key in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer"]:
316 newkey = {
317 "itemSectionRenderer": 'contents',
318 "gridRenderer": 'items',
319 "horizontalCardListRenderer": 'cards',
320 }.get(key)
321 r, e = parse_channel_items(content[newkey], channel_id, author)
322 result.extend(r)
323 extra.extend(e)
324 elif key == "shelfRenderer":
325 r, e = parse_channel_items([content['content']], channel_id, author)
326 result.extend(r)
327 extra.extend(e)
328 elif key == "messageRenderer":
329 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
330 pass
331 elif key == "gameCardRenderer":
332 pass
333 else:
334 log_unknown_card(item)
335
336 return result, extra
337
338 def parse_playlist(item):
339 key = next(iter(item.keys()), None)
340 content = item[key]
341 if key == "playlistVideoRenderer":
342 if not content.get('isPlayable', False):
343 return None # private or deleted video
344
345 return {'type': 'VIDEO', 'content': {
346 'video_id': content['videoId'],
347 'title': (content['title'].get('simpleText') or # playable videos
348 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
349 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
350 'index': content['navigationEndpoint']['watchEndpoint']['index'], #or int(content['index']['simpleText'])
351 # rest is missing from unplayable videos:
352 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
353 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
354 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
355 int(content.get("lengthSeconds", 0))), # "531"
356 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
357 }}
358 else:
359 raise Exception(item) # XXX TODO
Imprint / Impressum