]>
git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
3 from urllib
.parse
import parse_qs
, urlparse
7 given a list of dicts, where one dict contains a given key, return said key.
9 return [ obj
[key
] for obj
in obj
if key
in obj
.keys() ]
10 def listget(obj
, index
, fallback
=None):
11 return next(iter(obj
[index
:]), fallback
)
12 flatten
= lambda l
: [item
for sublist
in l
for item
in sublist
] # https://stackoverflow.com/a/952952
13 first
= lambda l
: next(iter(l
),{})
14 listfind
= lambda obj
,key
: first(findall(obj
,key
))
16 def prepare_searchresults(yt_results
):
17 contents
= listfind(yt_results
, 'response') \
19 .get('twoColumnSearchResultsRenderer',{})\
20 .get('primaryContents',{})\
21 .get('sectionListRenderer',{})\
23 contents
= flatten([c
.get('contents',[]) for c
in findall(contents
, 'itemSectionRenderer')])
25 return parse_result_items(contents
)
27 def prepare_infocards(metadata
):
28 cards
= metadata
.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
29 return list(filter(None, map(parse_infocard
, cards
)))
31 def prepare_endcards(metadata
):
32 endsc
= metadata
.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
33 return list(filter(None, map(parse_endcard
, endsc
)))
35 def prepare_channel(result
, channel_id
):
36 response
= listfind(result
,'response')
38 meta1
= response
.get('metadata',{}).get('channelMetadataRenderer',{})
39 meta2
= response
.get('microformat',{}).get('microformatDataRenderer',{})
40 title
= meta1
.get('title', meta2
.get('title'))
41 descr
= meta1
.get('description', meta2
.get('description')) # meta2.description is capped at 160chars
42 thumb
= mkthumbs(meta2
.get('thumbnail',meta1
.get('avatar',{})).get('thumbnails',{})) # .avatar ~ 900px
44 if 'continuationContents' in response
.keys():
45 contents
= response
['continuationContents']
47 items
= parse_channel_items(contents
['gridContinuation']['items'], channel_id
, title
)
50 items
= parse_channel_items(contents
['sectionListContinuation']['contents'], channel_id
, title
)
52 from flask
import current_app
53 current_app
.logger
.error(result
)
55 else: # if absent, we reach end of list
58 return title
, descr
, thumb
, items
60 def prepare_playlist(result
):
61 contents
= listfind(result
,'response')['continuationContents']['playlistVideoListContinuation'] \
62 .get('contents',[]) # no .contents if overran end of playlist
63 return list(filter(None, map(parse_playlist
, contents
)))
66 output
= {str(e
['height']): e
['url'] for e
in thumbs
}
67 largest
=next(iter(sorted(output
.keys(),reverse
=True,key
=int)),None)
68 return {**output
, 'largest': largest
}
71 # externals URLs are redirected through youtube.com/redirect, but we
72 # may encounter internal URLs, too
73 return parse_qs(urlparse(url
).query
).get('q',[url
])[0]
75 def toInt(s
, fallback
=0):
79 return int(''.join(filter(str.isdigit
, s
)))
83 # Remove left-/rightmost word from string:
84 delL
= lambda s
: s
.partition(' ')[2]
87 if s
is None: # missing from autogen'd music, some livestreams
89 # Some livestreams have "Streamed 7 hours ago"
90 s
= s
.replace("Streamed ","")
91 # Now, everything should be in the form "1 year ago"
92 value
, unit
, _
= s
.split(" ")
96 ).get(unit
, unit
[0]) # first letter otherwise (e.g. year(s) => y)
98 return f
"{value}{suffix}"
100 def log_unknown_card(data
):
103 from flask
import request
105 except: source
= "unknown"
106 with
open("/tmp/innertube.err", "a") as f
:
107 f
.write(f
"\n/***** {source} *****/\n")
108 json
.dump(data
, f
, indent
=2)
110 def parse_result_items(items
):
111 # TODO: use .get() for most non-essential attributes
113 parses youtube search response into an easier to use format.
118 key
= next(iter(item
.keys()), None)
120 if key
== 'videoRenderer':
121 is_live
= listfind(content
.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
122 results
.append({'type': 'VIDEO', 'content': {
123 'video_id': content
['videoId'],
124 'title': content
['title']['runs'][0]['text'],
125 'author': content
['longBylineText']['runs'][0]['text'] or \
126 content
['shortBylineText']['runs'][0]['text'],
127 'channel_id': content
['ownerText']['runs'][0] \
128 ['navigationEndpoint']['browseEndpoint']['browseId'],
129 'length': content
.get('lengthText',{}).get('simpleText') \
130 if not is_live
else 'LIVE', # "44:07", "1:41:50"
131 'views': toInt(content
.get('viewCountText',{}).get('simpleText') or # "123,456 views"
132 listget(content
.get('viewCountText',{}).get('runs'),0,{}).get('text')), # "1,234 watching"
133 'published': age(content
.get('publishedTimeText',{}).get('simpleText')),
135 elif key
== 'playlistRenderer':
136 results
.append({'type': 'PLAYLIST', 'content': {
137 'playlist_id': content
['navigationEndpoint']['watchEndpoint']['playlistId'],
138 'video_id': content
['navigationEndpoint']['watchEndpoint']['videoId'],
139 'title': content
['title']['simpleText'],
140 'author': content
['longBylineText']['runs'][0]['text'] or
141 content
['shortBylineText']['runs'][0]['text'],
142 'channel_id': content
['longBylineText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR .shortBylineText
143 'n_videos': toInt(content
['videoCount']),
145 elif key
== 'radioRenderer': # "Mix" playlists
146 results
.append({'type': 'PLAYLIST', 'content': {
147 'playlist_id': content
['playlistId'],
148 'video_id': content
['navigationEndpoint']['watchEndpoint']['videoId'],
149 'title': content
['title']['simpleText'],
150 'author': content
['longBylineText']['simpleText'] or \
151 content
['shortBylineText']['simpleText'] , # always "YouTube"
153 'n_videos': content
['videoCountShortText']['runs'][0]['text'] or \
154 content
['videoCountText']['runs'][0]['text'],
155 # videoCountShortText: "50+"; videoCountText: "50+ videos"
157 elif key
== 'channelRenderer':
158 results
.append({'type': 'CHANNEL', 'content': {
159 'channel_id': content
['channelId'],
160 'title': content
['title']['simpleText'],
161 'icons': mkthumbs(content
['thumbnail']['thumbnails']),
162 'subscribers': content
.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
164 elif key
== 'shelfRenderer':
165 r
, e
= parse_result_items(content
['content']['verticalListRenderer']['items'])
168 elif key
== 'movieRenderer': # movies to buy/rent
170 elif key
== 'carouselAdRenderer' or key
== 'searchPyvRenderer': # haha, no.
172 elif key
== 'horizontalCardListRenderer':
173 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
175 elif key
== 'emergencyOneboxRenderer': # suicide prevention hotline
177 elif key
== 'didYouMeanRenderer' or key
== 'showingResultsForRenderer':
180 'query': content
['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
181 'autocorrected': key
== 'showingResultsForRenderer',
184 log_unknown_card(item
)
185 return results
, extras
187 def parse_infocard(card
):
189 parses a single infocard into a format that's easier to handle.
191 card
= card
['cardRenderer']
192 ctype
= list(card
['content'].keys())[0]
193 content
= card
['content'][ctype
]
194 if ctype
== "pollRenderer":
195 return {'type': "POLL", 'content': {
196 'question': content
['question']['simpleText'],
197 'answers': [(a
['text']['simpleText'],a
['numVotes']) \
198 for a
in content
['choices']],
200 elif ctype
== "videoInfoCardContentRenderer":
201 is_live
= content
.get('badge',{}).get('liveBadgeRenderer') is not None
202 return {'type': "VIDEO", 'content': {
203 'video_id': content
['action']['watchEndpoint']['videoId'],
204 'title': content
['videoTitle']['simpleText'],
205 'author': delL(content
['channelName']['simpleText']),
206 'length': content
.get('lengthString',{}).get('simpleText') \
207 if not is_live
else "LIVE", # "23:03"
208 'views': toInt(content
.get('viewCountText',{}).get('simpleText')),
209 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
211 elif ctype
== "playlistInfoCardContentRenderer":
212 return {'type': "PLAYLIST", 'content': {
213 'playlist_id': content
['action']['watchEndpoint']['playlistId'],
214 'video_id': content
['action']['watchEndpoint']['videoId'],
215 'title': content
['playlistTitle']['simpleText'],
216 'author': delL(content
['channelName']['simpleText']),
217 'n_videos': toInt(content
['playlistVideoCount']['simpleText']),
219 elif ctype
== "simpleCardContentRenderer" and \
220 'urlEndpoint' in content
['command']:
221 return {'type': "WEBSITE", 'content': {
222 'url': clean_url(content
['command']['urlEndpoint']['url']),
223 'domain': content
['displayDomain']['simpleText'],
224 'title': content
['title']['simpleText'],
225 # XXX: no thumbnails for infocards
227 elif ctype
== "collaboratorInfoCardContentRenderer":
228 return {'type': "CHANNEL", 'content': {
229 'channel_id': content
['endpoint']['browseEndpoint']['browseId'],
230 'title': content
['channelName']['simpleText'],
231 'icons': mkthumbs(content
['channelAvatar']['thumbnails']),
232 'subscribers': content
.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
235 log_unknown_card(card
)
238 def parse_endcard(card
):
240 parses a single endcard into a format that's easier to handle.
242 card
= card
.get('endscreenElementRenderer', card
) #only sometimes nested
243 ctype
= card
['style']
244 if ctype
== "CHANNEL":
245 return {'type': ctype
, 'content': {
246 'channel_id': card
['endpoint']['browseEndpoint']['browseId'],
247 'title': card
['title']['simpleText'],
248 'icons': mkthumbs(card
['image']['thumbnails']),
250 elif ctype
== "VIDEO":
251 return {'type': ctype
, 'content': {
252 'video_id': card
['endpoint']['watchEndpoint']['videoId'],
253 'title': card
['title']['simpleText'],
254 'length': card
['videoDuration']['simpleText'], # '12:21'
255 'views': toInt(card
['metadata']['simpleText']),
256 # XXX: no channel name
258 elif ctype
== "PLAYLIST":
259 return {'type': ctype
, 'content': {
260 'playlist_id': card
['endpoint']['watchEndpoint']['playlistId'],
261 'video_id': card
['endpoint']['watchEndpoint']['videoId'],
262 'title': card
['title']['simpleText'],
263 'author': delL(card
['metadata']['simpleText']),
264 'n_videos': toInt(card
['playlistLength']['simpleText']),
266 elif ctype
== "WEBSITE" or ctype
== "CREATOR_MERCHANDISE":
267 url
= clean_url(card
['endpoint']['urlEndpoint']['url'])
268 return {'type': "WEBSITE", 'content': {
270 'domain': urlparse(url
).netloc
,
271 'title': card
['title']['simpleText'],
272 'icons': mkthumbs(card
['image']['thumbnails']),
275 log_unknown_card(card
)
278 def parse_channel_items(items
, channel_id
, author
):
281 key
= next(iter(item
.keys()), None)
283 if key
== "gridVideoRenderer" or key
== "videoRenderer":
284 result
.append({'type': 'VIDEO', 'content': {
285 'video_id': content
['videoId'],
286 'title': content
['title']['simpleText'],
288 'channel_id': channel_id
,
289 'length': listfind(content
.get('thumbnailOverlays',[]),'thumbnailOverlayTimeStatusRenderer').get('text',{}).get('simpleText'),
290 'views': toInt(content
.get('viewCountText',{}).get('simpleText')),
291 'published': age(content
.get('publishedTimeText',{}).get('simpleText')),
293 elif key
== "gridPlaylistRenderer" or key
== "playlistRenderer":
294 result
.append({'type': 'PLAYLIST', 'content': {
295 'playlist_id': content
['navigationEndpoint']['watchEndpoint']['playlistId'],
296 'video_id': content
['navigationEndpoint']['watchEndpoint']['videoId'],
297 'title': (content
['title'].get('simpleText') or # playlistRenderer
298 content
['title']['runs'][0]['text']), # gridPlaylistRenderer
300 'channel_id': channel_id
,
301 'n_videos': toInt(content
.get('videoCount') or # playlistRenderer
302 content
.get('videoCountShortText',{}).get('simpleText') or # grid(1)
303 content
.get('videoCountText',{}).get('runs',[{}])[0].get('text')), # grid(2)
305 elif key
== "itemSectionRenderer":
306 result
.extend(parse_channel_items(content
['contents'], channel_id
, author
))
308 raise Exception(item
) # XXX TODO
312 def parse_playlist(item
):
313 key
= next(iter(item
.keys()), None)
315 if key
== "playlistVideoRenderer":
316 if not content
.get('isPlayable', False):
317 return None # private or deleted video
319 return {'type': 'VIDEO', 'content': {
320 'video_id': content
['videoId'],
321 'title': (content
['title'].get('simpleText') or # playable videos
322 content
['title'].get('runs',[{}])[0].get('text')), # "[Private video]"
323 'playlist_id': content
['navigationEndpoint']['watchEndpoint']['playlistId'],
324 'index': content
['navigationEndpoint']['watchEndpoint']['index'], #or int(content['index']['simpleText'])
325 # rest is missing from unplayable videos:
326 'author': content
.get('shortBylineText',{}).get('runs',[{}])[0].get('text'),
327 'channel_id':content
.get('shortBylineText',{}).get('runs',[{}])[0].get('navigationEndpoint',{}).get('browseEndpoint',{}).get('browseId'),
328 'length': (content
.get("lengthText",{}).get("simpleText") or # "8:51"
329 int(content
.get("lengthSeconds", 0))), # "531"
330 'starttime': content
['navigationEndpoint']['watchEndpoint'].get('startTimeSeconds'),
333 raise Exception(item
) # XXX TODO