]>
git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
3 from urllib
.parse
import parse_qs
, urlparse
7 given a list of dicts, where one dict contains a given key, return said key.
9 if obj
is None: return []
10 return [ obj
[key
] for obj
in obj
if key
in obj
.keys() ]
11 def listget(obj
, index
, fallback
=None):
12 if obj
is None: return fallback
13 return next(iter(obj
[index
:]), fallback
)
14 flatten
= lambda l
: [item
for sublist
in l
for item
in sublist
] # https://stackoverflow.com/a/952952
15 first
= lambda l
: next(iter(l
),{})
16 listfind
= lambda obj
,key
: first(findall(obj
,key
))
18 def prepare_searchresults(yt_results
):
19 contents
= listfind(yt_results
, 'response') \
21 .get('twoColumnSearchResultsRenderer',{})\
22 .get('primaryContents',{})\
23 .get('sectionListRenderer',{})\
25 contents
= flatten([c
.get('contents',[]) for c
in findall(contents
, 'itemSectionRenderer')])
27 return parse_result_items(contents
)
29 def prepare_infocards(metadata
):
30 cards
= metadata
.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
31 return list(filter(None, map(parse_infocard
, cards
)))
33 def prepare_endcards(metadata
):
34 endsc
= metadata
.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
35 return list(filter(None, map(parse_endcard
, endsc
)))
37 def prepare_channel(result
, channel_id
):
38 response
= listfind(result
,'response')
40 if 'alerts' in response
: # possibly got an error back
41 from flask
import current_app
42 current_app
.logger
.error([(alert
['alertRenderer']['type'],alert
['alertRenderer']['text']['simpleText']) for alert
in response
['alerts']])
43 return None,None,[],[],False
45 meta1
= response
.get('metadata',{}).get('channelMetadataRenderer',{})
46 meta2
= response
.get('microformat',{}).get('microformatDataRenderer',{})
47 title
= meta1
.get('title', meta2
.get('title'))
48 descr
= meta1
.get('description', meta2
.get('description')) # meta2.description is capped at 160chars
49 thumb
= mkthumbs(meta2
.get('thumbnail',meta1
.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
51 contents
= response
.get('continuationContents')
52 if not contents
: # overran end of list
53 return title
, descr
, thumb
, [], False
55 unparsed
= contents
.get('gridContinuation',{}).get('items') or \
56 contents
.get('sectionListContinuation',{}).get('contents') or []
57 items
, extra
= parse_channel_items(unparsed
, channel_id
, title
)
58 has_more
= 'continuations' in (contents
.get('gridContinuation') or
59 contents
.get('sectionListContinuation') or {})
61 return title
, descr
, thumb
, items
, has_more
63 def prepare_playlist(result
):
64 contents
= listfind(result
,'response')['continuationContents']
65 unparsed
= contents
['playlistVideoListContinuation'].get('contents',[])
66 has_more
= 'continuations' in contents
.get('playlistVideoListContinuation')
68 return list(filter(None, map(parse_playlist
, unparsed
))), has_more
71 output
= {str(e
['height']): e
['url'] for e
in thumbs
}
72 largest
=next(iter(sorted(output
.keys(),reverse
=True,key
=int)),None)
73 return {**output
, 'largest': largest
}
76 # externals URLs are redirected through youtube.com/redirect, but we
77 # may encounter internal URLs, too
78 return parse_qs(urlparse(url
).query
).get('q',[url
])[0]
80 def toInt(s
, fallback
=0):
84 return int(''.join(filter(str.isdigit
, s
)))
88 # Remove left-/rightmost word from string:
89 delL
= lambda s
: s
.partition(' ')[2]
92 if s
is None: # missing from autogen'd music, some livestreams
94 # Some livestreams have "Streamed 7 hours ago"
95 s
= s
.replace("Streamed ","")
96 # Now, everything should be in the form "1 year ago"
97 value
, unit
, _
= s
.split(" ")
101 ).get(unit
, unit
[0]) # first letter otherwise (e.g. year(s) => y)
103 return f
"{value}{suffix}"
105 def log_unknown_card(data
):
108 from flask
import request
110 except: source
= "unknown"
111 with
open("/tmp/innertube.err", "a") as f
:
112 f
.write(f
"\n/***** {source} *****/\n")
113 json
.dump(data
, f
, indent
=2)
115 def parse_result_items(items
):
116 # TODO: use .get() for most non-essential attributes
118 parses youtube search response into an easier to use format.
123 key
= next(iter(item
.keys()), None)
125 if key
== 'videoRenderer':
126 is_live
= listfind(content
.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
127 results
.append({'type': 'VIDEO', 'content': {
128 'video_id': content
['videoId'],
129 'title': content
['title']['runs'][0]['text'],
130 'author': content
['longBylineText']['runs'][0]['text'] or \
131 content
['shortBylineText']['runs'][0]['text'],
132 'channel_id': content
['ownerText']['runs'][0] \
133 ['navigationEndpoint']['browseEndpoint']['browseId'],
134 'length': content
.get('lengthText',{}).get('simpleText') \
135 if not is_live
else 'LIVE', # "44:07", "1:41:50"
136 'views': toInt(content
.get('viewCountText',{}).get('simpleText') or # "123,456 views"
137 listget(content
.get('viewCountText',{}).get('runs'),0,{}).get('text')), # "1,234 watching"
138 'published': age(content
.get('publishedTimeText',{}).get('simpleText')),
140 elif key
== 'playlistRenderer':
141 results
.append({'type': 'PLAYLIST', 'content': {
142 'playlist_id': content
['navigationEndpoint']['watchEndpoint']['playlistId'],
143 'video_id': content
['navigationEndpoint']['watchEndpoint']['videoId'],
144 'title': content
['title']['simpleText'],
145 'author': content
['longBylineText']['runs'][0]['text'] or
146 content
['shortBylineText']['runs'][0]['text'],
147 'channel_id': content
['longBylineText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR .shortBylineText
148 'n_videos': toInt(content
['videoCount']),
150 elif key
== 'radioRenderer': # "Mix" playlists
151 results
.append({'type': 'PLAYLIST', 'content': {
152 'playlist_id': content
['playlistId'],
153 'video_id': content
['navigationEndpoint']['watchEndpoint']['videoId'],
154 'title': content
['title']['simpleText'],
155 'author': content
['longBylineText']['simpleText'] or \
156 content
['shortBylineText']['simpleText'] , # always "YouTube"
158 'n_videos': content
['videoCountShortText']['runs'][0]['text'] or \
159 content
['videoCountText']['runs'][0]['text'],
160 # videoCountShortText: "50+"; videoCountText: "50+ videos"
162 elif key
== 'channelRenderer':
163 results
.append({'type': 'CHANNEL', 'content': {
164 'channel_id': content
['channelId'],
165 'title': content
['title']['simpleText'],
166 'icons': mkthumbs(content
['thumbnail']['thumbnails']),
167 'subscribers': content
.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
169 elif key
== 'shelfRenderer':
170 r
, e
= parse_result_items(content
['content']['verticalListRenderer']['items'])
173 elif key
== 'movieRenderer': # movies to buy/rent
175 elif key
in ['carouselAdRenderer','searchPyvRenderer','promotedSparklesTextSearchRenderer']: # haha, no.
177 elif key
== 'horizontalCardListRenderer':
178 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
180 elif key
== 'emergencyOneboxRenderer': # suicide prevention hotline
182 elif key
== 'didYouMeanRenderer' or key
== 'showingResultsForRenderer':
185 'query': content
['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
186 'autocorrected': key
== 'showingResultsForRenderer',
189 log_unknown_card(item
)
190 return results
, extras
192 def parse_infocard(card
):
194 parses a single infocard into a format that's easier to handle.
196 card
= card
['cardRenderer']
197 ctype
= list(card
['content'].keys())[0]
198 content
= card
['content'][ctype
]
199 if ctype
== "pollRenderer":
200 return {'type': "POLL", 'content': {
201 'question': content
['question']['simpleText'],
202 'answers': [(a
['text']['simpleText'],a
['numVotes']) \
203 for a
in content
['choices']],
205 elif ctype
== "videoInfoCardContentRenderer":
206 is_live
= content
.get('badge',{}).get('liveBadgeRenderer') is not None
207 return {'type': "VIDEO", 'content': {
208 'video_id': content
['action']['watchEndpoint']['videoId'],
209 'title': content
['videoTitle']['simpleText'],
210 'author': delL(content
['channelName']['simpleText']),
211 'length': content
.get('lengthString',{}).get('simpleText') \
212 if not is_live
else "LIVE", # "23:03"
213 'views': toInt(content
.get('viewCountText',{}).get('simpleText')),
214 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
216 elif ctype
== "playlistInfoCardContentRenderer":
217 return {'type': "PLAYLIST", 'content': {
218 'playlist_id': content
['action']['watchEndpoint']['playlistId'],
219 'video_id': content
['action']['watchEndpoint']['videoId'],
220 'title': content
['playlistTitle']['simpleText'],
221 'author': delL(content
['channelName']['simpleText']),
222 'n_videos': toInt(content
['playlistVideoCount']['simpleText']),
224 elif ctype
== "simpleCardContentRenderer" and \
225 'urlEndpoint' in content
['command']:
226 return {'type': "WEBSITE", 'content': {
227 'url': clean_url(content
['command']['urlEndpoint']['url']),
228 'domain': content
['displayDomain']['simpleText'],
229 'title': content
['title']['simpleText'],
230 # XXX: no thumbnails for infocards
232 elif ctype
== "collaboratorInfoCardContentRenderer":
233 return {'type': "CHANNEL", 'content': {
234 'channel_id': content
['endpoint']['browseEndpoint']['browseId'],
235 'title': content
['channelName']['simpleText'],
236 'icons': mkthumbs(content
['channelAvatar']['thumbnails']),
237 'subscribers': content
.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
240 log_unknown_card(card
)
243 def parse_endcard(card
):
245 parses a single endcard into a format that's easier to handle.
247 card
= card
.get('endscreenElementRenderer', card
) #only sometimes nested
248 ctype
= card
['style']
249 if ctype
== "CHANNEL":
250 return {'type': ctype
, 'content': {
251 'channel_id': card
['endpoint']['browseEndpoint']['browseId'],
252 'title': card
['title']['simpleText'],
253 'icons': mkthumbs(card
['image']['thumbnails']),
255 elif ctype
== "VIDEO":
256 return {'type': ctype
, 'content': {
257 'video_id': card
['endpoint']['watchEndpoint']['videoId'],
258 'title': card
['title']['simpleText'],
259 'length': card
['videoDuration']['simpleText'], # '12:21'
260 'views': toInt(card
['metadata']['simpleText']),
261 # XXX: no channel name
263 elif ctype
== "PLAYLIST":
264 return {'type': ctype
, 'content': {
265 'playlist_id': card
['endpoint']['watchEndpoint']['playlistId'],
266 'video_id': card
['endpoint']['watchEndpoint']['videoId'],
267 'title': card
['title']['simpleText'],
268 'author': delL(card
['metadata']['simpleText']),
269 'n_videos': toInt(card
['playlistLength']['simpleText']),
271 elif ctype
== "WEBSITE" or ctype
== "CREATOR_MERCHANDISE":
272 url
= clean_url(card
['endpoint']['urlEndpoint']['url'])
273 return {'type': "WEBSITE", 'content': {
275 'domain': urlparse(url
).netloc
,
276 'title': card
['title']['simpleText'],
277 'icons': mkthumbs(card
['image']['thumbnails']),
280 log_unknown_card(card
)
283 def parse_channel_items(items
, channel_id
, author
):
287 key
= next(iter(item
.keys()), None)
289 if key
in ["gridVideoRenderer", "videoRenderer", "videoCardRenderer"]:
290 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
291 result
.append({'type': 'VIDEO', 'content': {
292 'video_id': content
['videoId'],
293 'title': content
['title'].get('simpleText') or content
['title'].get('runs',[{}])[0].get('text'),
294 'author': content
.get('bylineText',{}).get('runs',[{}])[0].get('text') or author
,
295 'channel_id': content
.get('bylineText',{}).get('runs',[{}])[0] \
296 .get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId') or channel_id
,
297 'length': (content
.get('lengthText',{}).get('simpleText') or # topic channel
298 listfind(content
.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer')
299 .get('text',{}).get('simpleText')),
300 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
301 'views': toInt(content
.get('viewCountText',{}).get('simpleText')),
302 'published': age(content
.get('publishedTimeText',{}).get('simpleText')),
304 elif key
== "gridPlaylistRenderer" or key
== "playlistRenderer":
305 result
.append({'type': 'PLAYLIST', 'content': {
306 'playlist_id': content
['navigationEndpoint']['watchEndpoint']['playlistId'],
307 'video_id': content
['navigationEndpoint']['watchEndpoint']['videoId'],
308 'title': (content
['title'].get('simpleText') or # playlistRenderer
309 content
['title']['runs'][0]['text']), # gridPlaylistRenderer
311 'channel_id': channel_id
,
312 'n_videos': toInt(content
.get('videoCount') or # playlistRenderer
313 content
.get('videoCountShortText',{}).get('simpleText') or # grid(1)
314 content
.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
316 elif key
in ["itemSectionRenderer", "gridRenderer", "horizontalCardListRenderer"]:
318 "itemSectionRenderer": 'contents',
319 "gridRenderer": 'items',
320 "horizontalCardListRenderer": 'cards',
322 r
, e
= parse_channel_items(content
[newkey
], channel_id
, author
)
325 elif key
== "shelfRenderer":
326 r
, e
= parse_channel_items([content
['content']], channel_id
, author
)
329 elif key
== "messageRenderer":
330 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
332 elif key
== "gameCardRenderer":
335 log_unknown_card(item
)
339 def parse_playlist(item
):
340 key
= next(iter(item
.keys()), None)
342 if key
== "playlistVideoRenderer":
343 if not content
.get('isPlayable', False):
344 return None # private or deleted video
346 return {'type': 'VIDEO', 'content': {
347 'video_id': content
['videoId'],
348 'title': (content
['title'].get('simpleText') or # playable videos
349 content
['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
350 'playlist_id': content
['navigationEndpoint']['watchEndpoint']['playlistId'],
351 'index': content
['navigationEndpoint']['watchEndpoint']['index'], #or int(content['index']['simpleText'])
352 # rest is missing from unplayable videos:
353 'author': content
.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
354 'channel_id':content
.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
355 'length': (content
.get("lengthText",{}).get("simpleText") or # "8:51"
356 int(content
.get("lengthSeconds", 0))), # "531"
357 'starttime': content
['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
360 raise Exception(item
) # XXX TODO