]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
dangerous.channel/playlist: parse extra messages
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4
5 def findall(obj, key):
6 """
7 given a list of dicts, where one dict contains a given key, return said key.
8 """
9 return [ obj[key] for obj in obj if key in obj.keys() ]
10 def listget(obj, index, fallback=None):
11 return next(iter(obj[index:]), fallback)
12 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
13 first = lambda l: next(iter(l),{})
14 listfind = lambda obj,key: first(findall(obj,key))
15
16 def prepare_searchresults(yt_results):
17 contents = listfind(yt_results, 'response') \
18 .get('contents',{})\
19 .get('twoColumnSearchResultsRenderer',{})\
20 .get('primaryContents',{})\
21 .get('sectionListRenderer',{})\
22 .get('contents',[])
23 contents = flatten([c.get('contents',[]) for c in findall(contents, 'itemSectionRenderer')])
24
25 return parse_result_items(contents)
26
27 def prepare_infocards(metadata):
28 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
29 return list(filter(None, map(parse_infocard, cards)))
30
31 def prepare_endcards(metadata):
32 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
33 return list(filter(None, map(parse_endcard, endsc)))
34
35 def prepare_channel(result, channel_id):
36 response = listfind(result,'response')
37
38 meta1 = response.get('metadata',{}).get('channelMetadataRenderer',{})
39 meta2 = response.get('microformat',{}).get('microformatDataRenderer',{})
40 title = meta1.get('title', meta2.get('title'))
41 descr = meta1.get('description', meta2.get('description')) # meta2.description is capped at 160chars
42 thumb = mkthumbs(meta2.get('thumbnail',meta1.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
43
44 if 'continuationContents' in response.keys():
45 contents = response['continuationContents']
46 try: # TODO: cleanup
47 items, extra = parse_channel_items(contents['gridContinuation']['items'], channel_id, title)
48 except:
49 try:
50 items, extra = parse_channel_items(contents['sectionListContinuation']['contents'], channel_id, title)
51 except Exception as e:
52 from flask import current_app
53 current_app.logger.error(result)
54 raise e
55 # TODO: show extra to user
56 else: # if absent, we reach end of list
57 from flask import current_app
58 current_app.logger.error(result)
59 items = []
60
61 return title, descr, thumb, items
62
63 def prepare_playlist(result):
64 contents = listfind(result,'response')['continuationContents']['playlistVideoListContinuation'] \
65 .get('contents',[]) # no .contents if overran end of playlist
66 return list(filter(None, map(parse_playlist, contents)))
67
68 def mkthumbs(thumbs):
69 output = {str(e['height']): e['url'] for e in thumbs}
70 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
71 return {**output, 'largest': largest}
72
73 def clean_url(url):
74 # externals URLs are redirected through youtube.com/redirect, but we
75 # may encounter internal URLs, too
76 return parse_qs(urlparse(url).query).get('q',[url])[0]
77
78 def toInt(s, fallback=0):
79 if s is None:
80 return fallback
81 try:
82 return int(''.join(filter(str.isdigit, s)))
83 except ValueError:
84 return fallback
85
86 # Remove left-/rightmost word from string:
87 delL = lambda s: s.partition(' ')[2]
88
89 def age(s):
90 if s is None: # missing from autogen'd music, some livestreams
91 return None
92 # Some livestreams have "Streamed 7 hours ago"
93 s = s.replace("Streamed ","")
94 # Now, everything should be in the form "1 year ago"
95 value, unit, _ = s.split(" ")
96 suffix = dict(
97 month='mn',
98 months='mn',
99 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
100
101 return f"{value}{suffix}"
102
103 def log_unknown_card(data):
104 import json
105 try:
106 from flask import request
107 source = request.url
108 except: source = "unknown"
109 with open("/tmp/innertube.err", "a") as f:
110 f.write(f"\n/***** {source} *****/\n")
111 json.dump(data, f, indent=2)
112
113 def parse_result_items(items):
114 # TODO: use .get() for most non-essential attributes
115 """
116 parses youtube search response into an easier to use format.
117 """
118 results = []
119 extras = []
120 for item in items:
121 key = next(iter(item.keys()), None)
122 content = item[key]
123 if key == 'videoRenderer':
124 is_live = listfind(content.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
125 results.append({'type': 'VIDEO', 'content': {
126 'video_id': content['videoId'],
127 'title': content['title']['runs'][0]['text'],
128 'author': content['longBylineText']['runs'][0]['text'] or \
129 content['shortBylineText']['runs'][0]['text'],
130 'channel_id': content['ownerText']['runs'][0] \
131 ['navigationEndpoint']['browseEndpoint']['browseId'],
132 'length': content.get('lengthText',{}).get('simpleText') \
133 if not is_live else 'LIVE', # "44:07", "1:41:50"
134 'views': toInt(content.get('viewCountText',{}).get('simpleText') or # "123,456 views"
135 listget(content.get('viewCountText',{}).get('runs'),0,{}).get('text')), # "1,234 watching"
136 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
137 }})
138 elif key == 'playlistRenderer':
139 results.append({'type': 'PLAYLIST', 'content': {
140 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
141 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
142 'title': content['title']['simpleText'],
143 'author': content['longBylineText']['runs'][0]['text'] or
144 content['shortBylineText']['runs'][0]['text'],
145 'channel_id': content['longBylineText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR .shortBylineText
146 'n_videos': toInt(content['videoCount']),
147 }})
148 elif key == 'radioRenderer': # "Mix" playlists
149 results.append({'type': 'PLAYLIST', 'content': {
150 'playlist_id': content['playlistId'],
151 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
152 'title': content['title']['simpleText'],
153 'author': content['longBylineText']['simpleText'] or \
154 content['shortBylineText']['simpleText'] , # always "YouTube"
155 'channel_id': None,
156 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \
157 content['videoCountText']['runs'][0]['text'],
158 # videoCountShortText: "50+"; videoCountText: "50+ videos"
159 }})
160 elif key == 'channelRenderer':
161 results.append({'type': 'CHANNEL', 'content': {
162 'channel_id': content['channelId'],
163 'title': content['title']['simpleText'],
164 'icons': mkthumbs(content['thumbnail']['thumbnails']),
165 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
166 }})
167 elif key == 'shelfRenderer':
168 r, e = parse_result_items(content['content']['verticalListRenderer']['items'])
169 results.extend(r)
170 extras.extend(e)
171 elif key == 'movieRenderer': # movies to buy/rent
172 pass
173 elif key == 'carouselAdRenderer' or key == 'searchPyvRenderer': # haha, no.
174 pass
175 elif key == 'horizontalCardListRenderer':
176 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
177 pass
178 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
179 pass
180 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
181 extras.append({
182 'type': 'spelling',
183 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
184 'autocorrected': key == 'showingResultsForRenderer',
185 })
186 else:
187 log_unknown_card(item)
188 return results, extras
189
190 def parse_infocard(card):
191 """
192 parses a single infocard into a format that's easier to handle.
193 """
194 card = card['cardRenderer']
195 ctype = list(card['content'].keys())[0]
196 content = card['content'][ctype]
197 if ctype == "pollRenderer":
198 return {'type': "POLL", 'content': {
199 'question': content['question']['simpleText'],
200 'answers': [(a['text']['simpleText'],a['numVotes']) \
201 for a in content['choices']],
202 }}
203 elif ctype == "videoInfoCardContentRenderer":
204 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
205 return {'type': "VIDEO", 'content': {
206 'video_id': content['action']['watchEndpoint']['videoId'],
207 'title': content['videoTitle']['simpleText'],
208 'author': delL(content['channelName']['simpleText']),
209 'length': content.get('lengthString',{}).get('simpleText') \
210 if not is_live else "LIVE", # "23:03"
211 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
212 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
213 }}
214 elif ctype == "playlistInfoCardContentRenderer":
215 return {'type': "PLAYLIST", 'content': {
216 'playlist_id': content['action']['watchEndpoint']['playlistId'],
217 'video_id': content['action']['watchEndpoint']['videoId'],
218 'title': content['playlistTitle']['simpleText'],
219 'author': delL(content['channelName']['simpleText']),
220 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
221 }}
222 elif ctype == "simpleCardContentRenderer" and \
223 'urlEndpoint' in content['command']:
224 return {'type': "WEBSITE", 'content': {
225 'url': clean_url(content['command']['urlEndpoint']['url']),
226 'domain': content['displayDomain']['simpleText'],
227 'title': content['title']['simpleText'],
228 # XXX: no thumbnails for infocards
229 }}
230 elif ctype == "collaboratorInfoCardContentRenderer":
231 return {'type': "CHANNEL", 'content': {
232 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
233 'title': content['channelName']['simpleText'],
234 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
235 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
236 }}
237 else:
238 log_unknown_card(card)
239 return None
240
241 def parse_endcard(card):
242 """
243 parses a single endcard into a format that's easier to handle.
244 """
245 card = card.get('endscreenElementRenderer', card) #only sometimes nested
246 ctype = card['style']
247 if ctype == "CHANNEL":
248 return {'type': ctype, 'content': {
249 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
250 'title': card['title']['simpleText'],
251 'icons': mkthumbs(card['image']['thumbnails']),
252 }}
253 elif ctype == "VIDEO":
254 return {'type': ctype, 'content': {
255 'video_id': card['endpoint']['watchEndpoint']['videoId'],
256 'title': card['title']['simpleText'],
257 'length': card['videoDuration']['simpleText'], # '12:21'
258 'views': toInt(card['metadata']['simpleText']),
259 # XXX: no channel name
260 }}
261 elif ctype == "PLAYLIST":
262 return {'type': ctype, 'content': {
263 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
264 'video_id': card['endpoint']['watchEndpoint']['videoId'],
265 'title': card['title']['simpleText'],
266 'author': delL(card['metadata']['simpleText']),
267 'n_videos': toInt(card['playlistLength']['simpleText']),
268 }}
269 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
270 url = clean_url(card['endpoint']['urlEndpoint']['url'])
271 return {'type': "WEBSITE", 'content': {
272 'url': url,
273 'domain': urlparse(url).netloc,
274 'title': card['title']['simpleText'],
275 'icons': mkthumbs(card['image']['thumbnails']),
276 }}
277 else:
278 log_unknown_card(card)
279 return None
280
281 def parse_channel_items(items, channel_id, author):
282 result = []
283 extra = []
284 for item in items:
285 key = next(iter(item.keys()), None)
286 content = item[key]
287 if key == "gridVideoRenderer" or key == "videoRenderer":
288 result.append({'type': 'VIDEO', 'content': {
289 'video_id': content['videoId'],
290 'title': content['title']['simpleText'],
291 'author': author,
292 'channel_id': channel_id,
293 'length': listfind(content.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer').get('text',{}).get('simpleText'),
294 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
295 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
296 }})
297 elif key == "gridPlaylistRenderer" or key == "playlistRenderer":
298 result.append({'type': 'PLAYLIST', 'content': {
299 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
300 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
301 'title': (content['title'].get('simpleText') or # playlistRenderer
302 content['title']['runs'][0]['text']), # gridPlaylistRenderer
303 'author': author,
304 'channel_id': channel_id,
305 'n_videos': toInt(content.get('videoCount') or # playlistRenderer
306 content.get('videoCountShortText',{}).get('simpleText') or # grid(1)
307 content.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
308 }})
309 elif key == "itemSectionRenderer":
310 r, e = parse_channel_items(content['contents'], channel_id, author)
311 result.extend(r)
312 extra.extend(e)
313 elif key == "messageRenderer":
314 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
315 pass
316 else:
317 log_unknown_card(item)
318
319 return result, extra
320
321 def parse_playlist(item):
322 key = next(iter(item.keys()), None)
323 content = item[key]
324 if key == "playlistVideoRenderer":
325 if not content.get('isPlayable', False):
326 return None # private or deleted video
327
328 return {'type': 'VIDEO', 'content': {
329 'video_id': content['videoId'],
330 'title': (content['title'].get('simpleText') or # playable videos
331 content['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
332 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
333 'index': content['navigationEndpoint']['watchEndpoint']['index'], #or int(content['index']['simpleText'])
334 # rest is missing from unplayable videos:
335 'author': content.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
336 'channel_id':content.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
337 'length': (content.get("lengthText",{}).get("simpleText") or # "8:51"
338 int(content.get("lengthSeconds", 0))), # "531"
339 'starttime': content['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
340 }}
341 else:
342 raise Exception(item) # XXX TODO
Imprint / Impressum