]>
git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
3 from urllib
. parse
import parse_qs
, urlparse
8 given a list of dicts, where one dict contains a given key, return said key.
10 if obj
is None : return []
11 return [ obj
[ key
] for obj
in obj
if key
in obj
. keys () ]
12 def listget ( obj
, index
, fallback
= None ):
13 if obj
is None : return fallback
14 return next ( iter ( obj
[ index
:]), fallback
)
15 flatten
= lambda l
: [ item
for sublist
in l
for item
in sublist
] # https://stackoverflow.com/a/952952
16 first
= lambda l
: next ( iter ( l
),{})
17 listfind
= lambda obj
, key
: first ( findall ( obj
, key
))
21 null-coalescing version of dict.get() that also works on lists.
23 the | operator is overloaded to achieve similar looking code to jq(1) filters.
24 the first found key is used: dict(foo=1)|G('bar','foo') returns 1.
26 def __init__ ( self
, * keys
):
28 def __ror__ ( self
, other
):
30 try : return other
[ key
]
34 """ parses youtube's .runs[].text and .simpleText variants """
35 def __ror__ ( self
, other
): # Note: only returning runs[0], not concat'ing all!
36 return other|
G ( 'simpleText' ) or other|
G ( 'runs' ) |
G ( 0 ) |
G ( 'text' )
39 """ |Select('foo') returns the first foo in list, |Select(all='foo') returns all foos. """
40 def __init__ ( self
, key
= None , *, all
= None ):
43 def __ror__ ( self
, other
):
44 try : items
= [ other
[ self
. key
] for other
in other
if self
. key
in other
. keys () ]
46 return items
if self
. all
else items|
G ( 0 )
49 def __init__ ( self
, f
, * args
):
52 def __ror__ ( self
, other
):
53 return self
. f ( other
, * self
. args
)
55 def __ror__ ( self
, other
):
56 try : return int ( '' . join ( filter ( str . isdigit
, other
)))
61 def prepare_searchresults ( yt_results
):
62 contents
= listfind ( yt_results
, 'response' ) \
64 . get ( 'twoColumnSearchResultsRenderer' ,{}) \
65 . get ( 'primaryContents' ,{}) \
66 . get ( 'sectionListRenderer' ,{}) \
68 contents
= flatten ([ c
. get ( 'contents' ,[]) for c
in contents|
Select ( all
= 'itemSectionRenderer' )])
70 return parse_result_items ( contents
)
72 def prepare_infocards ( metadata
):
73 cards
= metadata
. get ( 'cards' ,{}). get ( 'cardCollectionRenderer' ,{}). get ( 'cards' ,[])
74 return list ( filter ( None , map ( parse_infocard
, cards
)))
76 def prepare_endcards ( metadata
):
77 endsc
= metadata
. get ( 'endscreen' ,{}). get ( 'endscreenRenderer' ,{}). get ( 'elements' ,[])
78 return list ( filter ( None , map ( parse_endcard
, endsc
)))
80 def prepare_channel ( result
, channel_id
):
81 response
= listfind ( result
, 'response' )
83 if 'alerts' in response
: # possibly got an error back
84 from flask
import current_app
85 current_app
. logger
. error ([( alert
[ 'alertRenderer' ][ 'type' ], alert
[ 'alertRenderer' ][ 'text' ][ 'simpleText' ]) for alert
in response
[ 'alerts' ]])
86 return None , None ,[],[], False
88 meta1
= response
. get ( 'metadata' ,{}). get ( 'channelMetadataRenderer' ,{})
89 meta2
= response
. get ( 'microformat' ,{}). get ( 'microformatDataRenderer' ,{})
90 title
= meta1
. get ( 'title' , meta2
. get ( 'title' ))
91 descr
= meta1
. get ( 'description' , meta2
. get ( 'description' )) # meta2.description is capped at 160chars
92 thumb
= mkthumbs ( meta2
. get ( 'thumbnail' , meta1
. get ( 'avatar' ,{})). get ( 'thumbnails' ,{})) # .avatar ~ 900px
94 contents
= response
. get ( 'continuationContents' )
95 if not contents
: # overran end of list
96 return title
, descr
, thumb
, [], False
98 unparsed
= contents
. get ( 'gridContinuation' ,{}). get ( 'items' ) or \
99 contents
. get ( 'sectionListContinuation' ,{}). get ( 'contents' ) or []
100 items
, extra
= parse_channel_items ( unparsed
, channel_id
, title
)
101 has_more
= 'continuations' in ( contents
. get ( 'gridContinuation' ) or
102 contents
. get ( 'sectionListContinuation' ) or {})
104 return title
, descr
, thumb
, items
, has_more
106 def prepare_playlist ( result
):
107 contents
= listfind ( result
, 'response' )[ 'continuationContents' ]
108 unparsed
= contents
[ 'playlistVideoListContinuation' ]. get ( 'contents' ,[])
109 has_more
= 'continuations' in contents
. get ( 'playlistVideoListContinuation' )
111 return list ( filter ( None , map ( parse_playlist
, unparsed
))), has_more
113 def mkthumbs ( thumbs
):
114 output
= { str ( e
[ 'height' ]): e
[ 'url' ] for e
in thumbs
}
115 largest
= next ( iter ( sorted ( output
. keys (), reverse
= True , key
= int )), None )
116 return {** output
, 'largest' : largest
}
119 # externals URLs are redirected through youtube.com/redirect, but we
120 # may encounter internal URLs, too
121 return parse_qs ( urlparse ( url
). query
). get ( 'q' ,[ url
])[ 0 ]
123 def toInt ( s
, fallback
= 0 ):
127 return int ( '' . join ( filter ( str . isdigit
, s
)))
131 # Remove left-/rightmost word from string:
132 delL
= lambda s
: s
. partition ( ' ' )[ 2 ]
135 if s
is None : # missing from autogen'd music, some livestreams
137 # Some livestreams have "Streamed 7 hours ago"
138 s
= s
. replace ( "Streamed " , "" )
139 # Now, everything should be in the form "1 year ago"
140 value
, unit
, _
= s
. split ( " " )
144 ). get ( unit
, unit
[ 0 ]) # first letter otherwise (e.g. year(s) => y)
146 return f
" {value}{suffix} "
148 def log_unknown_card ( data
):
151 from flask
import request
153 except : source
= "unknown"
154 with
open ( "/tmp/innertube.err" , "a" ) as f
:
155 f
. write ( f
" \n /***** {source} *****/ \n " )
156 json
. dump ( data
, f
, indent
= 2 )
158 def parse_result_items ( items
):
159 # TODO: use .get() for most non-essential attributes
161 parses youtube search response into an easier to use format.
166 key
= next ( iter ( item
. keys ()), None )
168 if key
== 'videoRenderer' :
169 results
. append ({ 'type' : 'VIDEO' , 'content' : {
170 'video_id' : content
[ 'videoId' ],
171 'title' : content
[ 'title' ] |G
. text
,
172 'author' : content|
G ( 'longBylineText' , 'shortBylineText' ) |G
. text
,
173 'channel_id' : content|
G ( 'ownerText' ) |
G ( 'runs' ) |
G ( 0 ) \
174 |
G ( 'navigationEndpoint' ) |
G ( 'browseEndpoint' ) |
G ( 'browseId' ),
175 'length' : content|
G ( 'lengthText' ) |G
. text
, # "44:07", "1:41:50"
176 'views' : content|
G ( 'viewCountText' ) |G
. text|A
. int or 0 , # "1,234 {views|watching}", absent on 0 views
177 'published' : content|
G ( 'publishedTimeText' ) |
G ( 'simpleText' ) |
A ( age
),
178 'live' : content|
G ( 'badges' ) |
Select ( 'metadataBadgeRenderer' ) |
G ( 'style' )== 'BADGE_STYLE_TYPE_LIVE_NOW' ,
180 elif key
== 'playlistRenderer' :
181 results
. append ({ 'type' : 'PLAYLIST' , 'content' : {
182 'playlist_id' : content
[ 'navigationEndpoint' ]. get ( 'watchEndpoint' ,{}). get ( 'playlistId' ) or \
183 content
. get ( 'playlistId' ), # COURSE/"learning playlist"
184 'video_id' : content
[ 'navigationEndpoint' ]. get ( 'watchEndpoint' ,{}). get ( 'videoId' ) or \
185 videoid_from_thumbnail ( content
), # learning playlist
186 'title' : content
[ 'title' ][ 'simpleText' ],
187 # Note: learning playlists have no author/channel_id
188 'author' : listget ( content
. get ( 'longBylineText' ,{}). get ( 'runs' ,[]), 0 ,{}). get ( 'text' ) or
189 listget ( content
. get ( 'shortBylineText' ,{}). get ( 'runs' ,[]), 0 ,{}). get ( 'text' ),
190 'channel_id' : listget ( content
. get ( 'longBylineText' ,{}). get ( 'runs' ,[]), 0 ,{}) \
191 . get ( 'navigationEndpoint' ,{}). get ( 'browseEndpoint' ,{}). get ( 'browseId' ), # OR .shortBylineText
192 'n_videos' : toInt ( content
[ 'videoCount' ]),
194 elif key
== 'radioRenderer' : # "Mix" playlists
195 results
. append ({ 'type' : 'PLAYLIST' , 'content' : {
196 'playlist_id' : content
[ 'playlistId' ],
197 'video_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'videoId' ],
198 'title' : content
[ 'title' ][ 'simpleText' ],
199 'author' : content
[ 'longBylineText' ][ 'simpleText' ] or \
200 content
[ 'shortBylineText' ][ 'simpleText' ] , # always "YouTube"
202 'n_videos' : content
[ 'videoCountShortText' ][ 'runs' ][ 0 ][ 'text' ] or \
203 content
[ 'videoCountText' ][ 'runs' ][ 0 ][ 'text' ],
204 # videoCountShortText: "50+"; videoCountText: "50+ videos"
206 elif key
== 'showRenderer' : # normal playlist, specially displayed
207 results
. append ({ 'type' : 'PLAYLIST' , 'content' : {
208 'playlist_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'playlistId' ],
209 'video_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'videoId' ],
210 'title' : content
[ 'title' ][ 'simpleText' ],
211 'author' : content
[ 'longBylineText' ][ 'runs' ][ 0 ][ 'text' ] or \
212 content
[ 'shortBylineText' ][ 'runs' ][ 0 ][ 'text' ],
216 elif key
== 'channelRenderer' :
217 results
. append ({ 'type' : 'CHANNEL' , 'content' : {
218 'channel_id' : content
[ 'channelId' ],
219 'title' : content
[ 'title' ][ 'simpleText' ],
220 'icons' : mkthumbs ( content
[ 'thumbnail' ][ 'thumbnails' ]),
221 'subscribers' : content
. get ( 'subscriberCountText' ,{}). get ( 'simpleText' ), # "2.47K subscribers"
223 elif key
== 'shelfRenderer' :
224 subkey
= next ( iter ( content
[ 'content' ]. keys ()), {}) #verticalListRenderer/horizontalMovieListRenderer
225 r
, e
= parse_result_items ( content
[ 'content' ][ subkey
][ 'items' ])
228 elif key
in [ 'movieRenderer' , 'gridMovieRenderer' ]: # movies to buy/rent
229 pass # gMR.{videoId,title.runs[].text,lengthText.simpleText}
230 elif key
in [ 'carouselAdRenderer' , 'searchPyvRenderer' , 'promotedSparklesTextSearchRenderer' ]: # haha, no.
232 elif key
== 'horizontalCardListRenderer' :
233 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
235 elif key
== 'emergencyOneboxRenderer' : # suicide prevention hotline
237 elif key
in [ 'clarificationRenderer' , 'infoPanelContainerRenderer' ]: # COVID-19/conspiracy theory infos
239 elif key
== 'webAnswerRenderer' : # "Result from the web"
241 elif key
== 'didYouMeanRenderer' or key
== 'showingResultsForRenderer' :
244 'query' : content
[ 'correctedQueryEndpoint' ][ 'searchEndpoint' ][ 'query' ], # non-misspelled query
245 'autocorrected' : key
== 'showingResultsForRenderer' ,
247 elif key
== 'messageRenderer' : # "No more results"
250 'message' : content|
G ( 'title' ) |
G ( 'runs' ) |
G ( 0 ) |
G ( 'text' ) or \
251 content|
G ( 'text' ) |
G ( 'runs' ) |
G ( 0 ) |
G ( 'text' ),
253 elif key
== 'backgroundPromoRenderer' : # e.g. "no results"
255 'type' : content
[ 'icon' ][ 'iconType' ],
256 'message' : content
[ 'title' ][ 'runs' ][ 0 ][ 'text' ],
259 log_unknown_card ( item
)
260 return results
, extras
262 def parse_infocard ( card
):
264 parses a single infocard into a format that's easier to handle.
266 card
= card
[ 'cardRenderer' ]
267 ctype
= list ( card
[ 'content' ]. keys ())[ 0 ]
268 content
= card
[ 'content' ][ ctype
]
269 if ctype
== "pollRenderer" :
270 return { 'type' : "POLL" , 'content' : {
271 'question' : content
[ 'question' ][ 'simpleText' ],
272 'answers' : [( a
[ 'text' ][ 'simpleText' ], a
[ 'numVotes' ]) \
273 for a
in content
[ 'choices' ]],
275 elif ctype
== "videoInfoCardContentRenderer" :
276 is_live
= content
. get ( 'badge' ,{}). get ( 'liveBadgeRenderer' ) is not None
277 return { 'type' : "VIDEO" , 'content' : {
278 'video_id' : content
[ 'action' ][ 'watchEndpoint' ][ 'videoId' ],
279 'title' : content
[ 'videoTitle' ][ 'simpleText' ],
280 'author' : delL ( content
[ 'channelName' ][ 'simpleText' ]),
281 'length' : content
. get ( 'lengthString' ,{}). get ( 'simpleText' ) \
282 if not is_live
else "LIVE" , # "23:03"
283 'views' : toInt ( content
. get ( 'viewCountText' ,{}). get ( 'simpleText' )),
284 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
286 elif ctype
== "playlistInfoCardContentRenderer" :
287 return { 'type' : "PLAYLIST" , 'content' : {
288 'playlist_id' : content
[ 'action' ][ 'watchEndpoint' ][ 'playlistId' ],
289 'video_id' : content
[ 'action' ][ 'watchEndpoint' ][ 'videoId' ],
290 'title' : content
[ 'playlistTitle' ][ 'simpleText' ],
291 'author' : delL ( content
[ 'channelName' ][ 'simpleText' ]),
292 'n_videos' : toInt ( content
[ 'playlistVideoCount' ][ 'simpleText' ]),
294 elif ctype
== "simpleCardContentRenderer" and \
295 'urlEndpoint' in content
[ 'command' ]:
296 return { 'type' : "WEBSITE" , 'content' : {
297 'url' : clean_url ( content
[ 'command' ][ 'urlEndpoint' ][ 'url' ]),
298 'domain' : content
[ 'displayDomain' ][ 'simpleText' ],
299 'title' : content
[ 'title' ][ 'simpleText' ],
300 # XXX: no thumbnails for infocards
302 elif ctype
== "collaboratorInfoCardContentRenderer" :
303 return { 'type' : "CHANNEL" , 'content' : {
304 'channel_id' : content
[ 'endpoint' ][ 'browseEndpoint' ][ 'browseId' ],
305 'title' : content
[ 'channelName' ][ 'simpleText' ],
306 'icons' : mkthumbs ( content
[ 'channelAvatar' ][ 'thumbnails' ]),
307 'subscribers' : content
. get ( 'subscriberCountText' ,{}). get ( 'simpleText' , '' ), # "545K subscribers"
310 log_unknown_card ( card
)
313 def parse_endcard ( card
):
315 parses a single endcard into a format that's easier to handle.
317 card
= card
. get ( 'endscreenElementRenderer' , card
) #only sometimes nested
318 ctype
= card
[ 'style' ]
319 if ctype
== "CHANNEL" :
320 return { 'type' : ctype
, 'content' : {
321 'channel_id' : card
[ 'endpoint' ][ 'browseEndpoint' ][ 'browseId' ],
322 'title' : card
[ 'title' ][ 'simpleText' ],
323 'icons' : mkthumbs ( card
[ 'image' ][ 'thumbnails' ]),
325 elif ctype
== "VIDEO" :
326 return { 'type' : ctype
, 'content' : {
327 'video_id' : card
[ 'endpoint' ][ 'watchEndpoint' ][ 'videoId' ],
328 'title' : card
[ 'title' ][ 'simpleText' ],
329 'length' : card
[ 'videoDuration' ][ 'simpleText' ], # '12:21'
330 'views' : toInt ( card
[ 'metadata' ][ 'simpleText' ]),
331 # XXX: no channel name
333 elif ctype
== "PLAYLIST" :
334 return { 'type' : ctype
, 'content' : {
335 'playlist_id' : card
[ 'endpoint' ][ 'watchEndpoint' ][ 'playlistId' ],
336 'video_id' : card
[ 'endpoint' ][ 'watchEndpoint' ][ 'videoId' ],
337 'title' : card
[ 'title' ][ 'simpleText' ],
338 'author' : delL ( card
[ 'metadata' ][ 'simpleText' ]),
339 'n_videos' : toInt ( card
[ 'playlistLength' ][ 'simpleText' ]),
341 elif ctype
== "WEBSITE" or ctype
== "CREATOR_MERCHANDISE" :
342 url
= clean_url ( card
[ 'endpoint' ][ 'urlEndpoint' ][ 'url' ])
343 return { 'type' : "WEBSITE" , 'content' : {
345 'domain' : urlparse ( url
). netloc
,
346 'title' : card
[ 'title' ][ 'simpleText' ],
347 'icons' : mkthumbs ( card
[ 'image' ][ 'thumbnails' ]),
350 log_unknown_card ( card
)
353 def videoid_from_thumbnail ( content
):
354 # learning playlist; example: PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5 (/user/enyay/playlists)
355 return re
. match ( r
"https?://i.ytimg.com/vi/([-_0-9a-zA-Z] {11} )|()" ,
356 listget ( listget ( content
. get ( 'thumbnails' ,[]), 0 ,{}). get ( 'thumbnails' ,[]), 0 ,{}). get ( 'url' , '' )
359 def parse_channel_items ( items
, channel_id
, author
):
363 key
= next ( iter ( item
. keys ()), None )
365 if key
in [ "gridVideoRenderer" , "videoRenderer" , "videoCardRenderer" ]:
366 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
367 result
. append ({ 'type' : 'VIDEO' , 'content' : {
368 'video_id' : content
[ 'videoId' ],
369 'title' : content
[ 'title' ]. get ( 'simpleText' ) or content
[ 'title' ]. get ( 'runs' ,[{}])[ 0 ]. get ( 'text' ),
370 'author' : content
. get ( 'bylineText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'text' ) or author
,
371 'channel_id' : content
. get ( 'bylineText' ,{}). get ( 'runs' ,[{}])[ 0 ] \
372 . get ( 'navigationEndpoint' ,{}). get ( 'browseEndpoint' ,{}). get ( 'browseId' ) or channel_id
,
373 'length' : ( content
. get ( 'lengthText' ,{}). get ( 'simpleText' ) or # topic channel
374 listfind ( content
. get ( 'thumbnailOverlays' ,[]), 'thumbnailOverlayTimeStatusRenderer' )
375 . get ( 'text' ,{}). get ( 'simpleText' )),
376 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
377 'views' : toInt ( content
. get ( 'viewCountText' ,{}). get ( 'simpleText' )),
378 'published' : age ( content
. get ( 'publishedTimeText' ,{}). get ( 'simpleText' )),
380 elif key
== "gridPlaylistRenderer" or key
== "playlistRenderer" :
381 result
. append ({ 'type' : 'PLAYLIST' , 'content' : {
382 'playlist_id' : content
[ 'navigationEndpoint' ]. get ( 'watchEndpoint' ,{}). get ( 'playlistId' ) or content
. get ( 'playlistId' ),
383 'video_id' : content
[ 'navigationEndpoint' ]. get ( 'watchEndpoint' ,{}). get ( 'videoId' ,{}) or videoid_from_thumbnail ( content
),
384 'title' : ( content
[ 'title' ]. get ( 'simpleText' ) or # playlistRenderer
385 content
[ 'title' ][ 'runs' ][ 0 ][ 'text' ]), # gridPlaylistRenderer
387 'channel_id' : channel_id
,
388 'n_videos' : toInt ( content
. get ( 'videoCount' ) or # playlistRenderer
389 content
. get ( 'videoCountShortText' ,{}). get ( 'simpleText' ) or # grid(1)
390 content
. get ( 'videoCountText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'text' )), # grid(2)
392 elif key
== "showRenderer" :
393 result
. append ({ 'type' : 'PLAYLIST' , 'content' : {
394 'playlist_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'playlistId' ],
395 'video_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'videoId' ],
396 'title' : content
[ 'title' ][ 'simpleText' ],
398 'channel_id' : channel_id
,
401 elif key
in [ "itemSectionRenderer" , "gridRenderer" , "horizontalCardListRenderer" ]:
403 "itemSectionRenderer" : 'contents' ,
404 "gridRenderer" : 'items' ,
405 "horizontalCardListRenderer" : 'cards' ,
407 r
, e
= parse_channel_items ( content
[ newkey
], channel_id
, author
)
410 elif key
== "shelfRenderer" :
411 r
, e
= parse_channel_items ([ content
[ 'content' ]], channel_id
, author
)
414 elif key
== "messageRenderer" :
415 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
417 elif key
== "gameCardRenderer" :
420 log_unknown_card ( item
)
424 def parse_playlist ( item
):
425 key
= next ( iter ( item
. keys ()), None )
427 if key
== "playlistVideoRenderer" :
428 if not content
. get ( 'isPlayable' , False ):
429 return None # private or deleted video
431 return { 'type' : 'VIDEO' , 'content' : {
432 'video_id' : content
[ 'videoId' ],
433 'title' : ( content
[ 'title' ]. get ( 'simpleText' ) or # playable videos
434 content
[ 'title' ]. get ( 'runs' ,[{}])[ 0 ]. get ( 'text' )), # "[Private video]"
435 'playlist_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'playlistId' ],
436 'index' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ]. get ( 'index' , 0 ), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
437 # rest is missing from unplayable videos:
438 'author' : content
. get ( 'shortBylineText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'text' ),
439 'channel_id' : content
. get ( 'shortBylineText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'navigationEndpoint' ,{}). get ( 'browseEndpoint' ,{}). get ( 'browseId' ),
440 'length' : ( content
. get ( "lengthText" ,{}). get ( "simpleText" ) or # "8:51"
441 int ( content
. get ( "lengthSeconds" , 0 ))), # "531"
442 'starttime' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ]. get ( 'startTimeSeconds' ),
445 raise Exception ( item
) # XXX TODO