]>
git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
3 from urllib
. parse
import parse_qs
, urlparse
7 given a list of dicts, where one dict contains a given key, return said key.
9 if obj
is None : return []
10 return [ obj
[ key
] for obj
in obj
if key
in obj
. keys () ]
11 def listget ( obj
, index
, fallback
= None ):
12 if obj
is None : return fallback
13 return next ( iter ( obj
[ index
:]), fallback
)
14 flatten
= lambda l
: [ item
for sublist
in l
for item
in sublist
] # https://stackoverflow.com/a/952952
15 first
= lambda l
: next ( iter ( l
),{})
16 listfind
= lambda obj
, key
: first ( findall ( obj
, key
))
18 def prepare_searchresults ( yt_results
):
19 contents
= listfind ( yt_results
, 'response' ) \
21 . get ( 'twoColumnSearchResultsRenderer' ,{}) \
22 . get ( 'primaryContents' ,{}) \
23 . get ( 'sectionListRenderer' ,{}) \
25 contents
= flatten ([ c
. get ( 'contents' ,[]) for c
in findall ( contents
, 'itemSectionRenderer' )])
27 return parse_result_items ( contents
)
29 def prepare_infocards ( metadata
):
30 cards
= metadata
. get ( 'cards' ,{}). get ( 'cardCollectionRenderer' ,{}). get ( 'cards' ,[])
31 return list ( filter ( None , map ( parse_infocard
, cards
)))
33 def prepare_endcards ( metadata
):
34 endsc
= metadata
. get ( 'endscreen' ,{}). get ( 'endscreenRenderer' ,{}). get ( 'elements' ,[])
35 return list ( filter ( None , map ( parse_endcard
, endsc
)))
37 def prepare_channel ( result
, channel_id
):
38 response
= listfind ( result
, 'response' )
40 if 'alerts' in response
: # possibly got an error back
41 from flask
import current_app
42 current_app
. logger
. error ([( alert
[ 'alertRenderer' ][ 'type' ], alert
[ 'alertRenderer' ][ 'text' ][ 'simpleText' ]) for alert
in response
[ 'alerts' ]])
43 return None , None ,[],[], False
45 meta1
= response
. get ( 'metadata' ,{}). get ( 'channelMetadataRenderer' ,{})
46 meta2
= response
. get ( 'microformat' ,{}). get ( 'microformatDataRenderer' ,{})
47 title
= meta1
. get ( 'title' , meta2
. get ( 'title' ))
48 descr
= meta1
. get ( 'description' , meta2
. get ( 'description' )) # meta2.description is capped at 160chars
49 thumb
= mkthumbs ( meta2
. get ( 'thumbnail' , meta1
. get ( 'avatar' ,{})). get ( 'thumbnails' ,{})) # .avatar ~ 900px
51 contents
= response
. get ( 'continuationContents' )
52 if not contents
: # overran end of list
53 return title
, descr
, thumb
, [], False
55 unparsed
= contents
. get ( 'gridContinuation' ,{}). get ( 'items' ) or \
56 contents
. get ( 'sectionListContinuation' ,{}). get ( 'contents' ) or []
57 items
, extra
= parse_channel_items ( unparsed
, channel_id
, title
)
58 has_more
= 'continuations' in ( contents
. get ( 'gridContinuation' ) or
59 contents
. get ( 'sectionListContinuation' ) or {})
61 return title
, descr
, thumb
, items
, has_more
63 def prepare_playlist ( result
):
64 contents
= listfind ( result
, 'response' )[ 'continuationContents' ]
65 unparsed
= contents
[ 'playlistVideoListContinuation' ]. get ( 'contents' ,[])
66 has_more
= 'continuations' in contents
. get ( 'playlistVideoListContinuation' )
68 return list ( filter ( None , map ( parse_playlist
, unparsed
))), has_more
71 output
= { str ( e
[ 'height' ]): e
[ 'url' ] for e
in thumbs
}
72 largest
= next ( iter ( sorted ( output
. keys (), reverse
= True , key
= int )), None )
73 return {** output
, 'largest' : largest
}
76 # externals URLs are redirected through youtube.com/redirect, but we
77 # may encounter internal URLs, too
78 return parse_qs ( urlparse ( url
). query
). get ( 'q' ,[ url
])[ 0 ]
80 def toInt ( s
, fallback
= 0 ):
84 return int ( '' . join ( filter ( str . isdigit
, s
)))
88 # Remove left-/rightmost word from string:
89 delL
= lambda s
: s
. partition ( ' ' )[ 2 ]
92 if s
is None : # missing from autogen'd music, some livestreams
94 # Some livestreams have "Streamed 7 hours ago"
95 s
= s
. replace ( "Streamed " , "" )
96 # Now, everything should be in the form "1 year ago"
97 value
, unit
, _
= s
. split ( " " )
101 ). get ( unit
, unit
[ 0 ]) # first letter otherwise (e.g. year(s) => y)
103 return f
" {value}{suffix} "
105 def log_unknown_card ( data
):
108 from flask
import request
110 except : source
= "unknown"
111 with
open ( "/tmp/innertube.err" , "a" ) as f
:
112 f
. write ( f
" \n /***** {source} *****/ \n " )
113 json
. dump ( data
, f
, indent
= 2 )
115 def parse_result_items ( items
):
116 # TODO: use .get() for most non-essential attributes
118 parses youtube search response into an easier to use format.
123 key
= next ( iter ( item
. keys ()), None )
125 if key
== 'videoRenderer' :
126 is_live
= listfind ( content
. get ( 'badges' ,[]), 'metadataBadgeRenderer' ). get ( 'style' ) == 'BADGE_STYLE_TYPE_LIVE_NOW'
127 results
. append ({ 'type' : 'VIDEO' , 'content' : {
128 'video_id' : content
[ 'videoId' ],
129 'title' : content
[ 'title' ][ 'runs' ][ 0 ][ 'text' ],
130 'author' : content
[ 'longBylineText' ][ 'runs' ][ 0 ][ 'text' ] or \
131 content
[ 'shortBylineText' ][ 'runs' ][ 0 ][ 'text' ],
132 'channel_id' : content
[ 'ownerText' ][ 'runs' ][ 0 ] \
133 [ 'navigationEndpoint' ][ 'browseEndpoint' ][ 'browseId' ],
134 'length' : content
. get ( 'lengthText' ,{}). get ( 'simpleText' ) \
135 if not is_live
else 'LIVE' , # "44:07", "1:41:50"
136 'views' : toInt ( content
. get ( 'viewCountText' ,{}). get ( 'simpleText' ) or # "123,456 views"
137 listget ( content
. get ( 'viewCountText' ,{}). get ( 'runs' ), 0 ,{}). get ( 'text' )), # "1,234 watching"
138 'published' : age ( content
. get ( 'publishedTimeText' ,{}). get ( 'simpleText' )),
140 elif key
== 'playlistRenderer' :
141 results
. append ({ 'type' : 'PLAYLIST' , 'content' : {
142 'playlist_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'playlistId' ],
143 'video_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'videoId' ],
144 'title' : content
[ 'title' ][ 'simpleText' ],
145 'author' : content
[ 'longBylineText' ][ 'runs' ][ 0 ][ 'text' ] or
146 content
[ 'shortBylineText' ][ 'runs' ][ 0 ][ 'text' ],
147 'channel_id' : content
[ 'longBylineText' ][ 'runs' ][ 0 ][ 'navigationEndpoint' ][ 'browseEndpoint' ][ 'browseId' ], # OR .shortBylineText
148 'n_videos' : toInt ( content
[ 'videoCount' ]),
150 elif key
== 'radioRenderer' : # "Mix" playlists
151 results
. append ({ 'type' : 'PLAYLIST' , 'content' : {
152 'playlist_id' : content
[ 'playlistId' ],
153 'video_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'videoId' ],
154 'title' : content
[ 'title' ][ 'simpleText' ],
155 'author' : content
[ 'longBylineText' ][ 'simpleText' ] or \
156 content
[ 'shortBylineText' ][ 'simpleText' ] , # always "YouTube"
158 'n_videos' : content
[ 'videoCountShortText' ][ 'runs' ][ 0 ][ 'text' ] or \
159 content
[ 'videoCountText' ][ 'runs' ][ 0 ][ 'text' ],
160 # videoCountShortText: "50+"; videoCountText: "50+ videos"
162 elif key
== 'channelRenderer' :
163 results
. append ({ 'type' : 'CHANNEL' , 'content' : {
164 'channel_id' : content
[ 'channelId' ],
165 'title' : content
[ 'title' ][ 'simpleText' ],
166 'icons' : mkthumbs ( content
[ 'thumbnail' ][ 'thumbnails' ]),
167 'subscribers' : content
. get ( 'subscriberCountText' ,{}). get ( 'simpleText' ), # "2.47K subscribers"
169 elif key
== 'shelfRenderer' :
170 r
, e
= parse_result_items ( content
[ 'content' ][ 'verticalListRenderer' ][ 'items' ])
173 elif key
== 'movieRenderer' : # movies to buy/rent
175 elif key
in [ 'carouselAdRenderer' , 'searchPyvRenderer' , 'promotedSparklesTextSearchRenderer' ]: # haha, no.
177 elif key
== 'horizontalCardListRenderer' :
178 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
180 elif key
== 'emergencyOneboxRenderer' : # suicide prevention hotline
182 elif key
== 'clarificationRenderer' : # COVID-19 infos
184 elif key
== 'didYouMeanRenderer' or key
== 'showingResultsForRenderer' :
187 'query' : content
[ 'correctedQueryEndpoint' ][ 'searchEndpoint' ][ 'query' ], # non-misspelled query
188 'autocorrected' : key
== 'showingResultsForRenderer' ,
190 elif key
== 'backgroundPromoRenderer' : # e.g. "no results"
192 'type' : content
[ 'icon' ][ 'iconType' ],
193 'message' : content
[ 'title' ][ 'runs' ][ 0 ][ 'text' ],
196 log_unknown_card ( item
)
197 return results
, extras
199 def parse_infocard ( card
):
201 parses a single infocard into a format that's easier to handle.
203 card
= card
[ 'cardRenderer' ]
204 ctype
= list ( card
[ 'content' ]. keys ())[ 0 ]
205 content
= card
[ 'content' ][ ctype
]
206 if ctype
== "pollRenderer" :
207 return { 'type' : "POLL" , 'content' : {
208 'question' : content
[ 'question' ][ 'simpleText' ],
209 'answers' : [( a
[ 'text' ][ 'simpleText' ], a
[ 'numVotes' ]) \
210 for a
in content
[ 'choices' ]],
212 elif ctype
== "videoInfoCardContentRenderer" :
213 is_live
= content
. get ( 'badge' ,{}). get ( 'liveBadgeRenderer' ) is not None
214 return { 'type' : "VIDEO" , 'content' : {
215 'video_id' : content
[ 'action' ][ 'watchEndpoint' ][ 'videoId' ],
216 'title' : content
[ 'videoTitle' ][ 'simpleText' ],
217 'author' : delL ( content
[ 'channelName' ][ 'simpleText' ]),
218 'length' : content
. get ( 'lengthString' ,{}). get ( 'simpleText' ) \
219 if not is_live
else "LIVE" , # "23:03"
220 'views' : toInt ( content
. get ( 'viewCountText' ,{}). get ( 'simpleText' )),
221 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
223 elif ctype
== "playlistInfoCardContentRenderer" :
224 return { 'type' : "PLAYLIST" , 'content' : {
225 'playlist_id' : content
[ 'action' ][ 'watchEndpoint' ][ 'playlistId' ],
226 'video_id' : content
[ 'action' ][ 'watchEndpoint' ][ 'videoId' ],
227 'title' : content
[ 'playlistTitle' ][ 'simpleText' ],
228 'author' : delL ( content
[ 'channelName' ][ 'simpleText' ]),
229 'n_videos' : toInt ( content
[ 'playlistVideoCount' ][ 'simpleText' ]),
231 elif ctype
== "simpleCardContentRenderer" and \
232 'urlEndpoint' in content
[ 'command' ]:
233 return { 'type' : "WEBSITE" , 'content' : {
234 'url' : clean_url ( content
[ 'command' ][ 'urlEndpoint' ][ 'url' ]),
235 'domain' : content
[ 'displayDomain' ][ 'simpleText' ],
236 'title' : content
[ 'title' ][ 'simpleText' ],
237 # XXX: no thumbnails for infocards
239 elif ctype
== "collaboratorInfoCardContentRenderer" :
240 return { 'type' : "CHANNEL" , 'content' : {
241 'channel_id' : content
[ 'endpoint' ][ 'browseEndpoint' ][ 'browseId' ],
242 'title' : content
[ 'channelName' ][ 'simpleText' ],
243 'icons' : mkthumbs ( content
[ 'channelAvatar' ][ 'thumbnails' ]),
244 'subscribers' : content
. get ( 'subscriberCountText' ,{}). get ( 'simpleText' , '' ), # "545K subscribers"
247 log_unknown_card ( card
)
250 def parse_endcard ( card
):
252 parses a single endcard into a format that's easier to handle.
254 card
= card
. get ( 'endscreenElementRenderer' , card
) #only sometimes nested
255 ctype
= card
[ 'style' ]
256 if ctype
== "CHANNEL" :
257 return { 'type' : ctype
, 'content' : {
258 'channel_id' : card
[ 'endpoint' ][ 'browseEndpoint' ][ 'browseId' ],
259 'title' : card
[ 'title' ][ 'simpleText' ],
260 'icons' : mkthumbs ( card
[ 'image' ][ 'thumbnails' ]),
262 elif ctype
== "VIDEO" :
263 return { 'type' : ctype
, 'content' : {
264 'video_id' : card
[ 'endpoint' ][ 'watchEndpoint' ][ 'videoId' ],
265 'title' : card
[ 'title' ][ 'simpleText' ],
266 'length' : card
[ 'videoDuration' ][ 'simpleText' ], # '12:21'
267 'views' : toInt ( card
[ 'metadata' ][ 'simpleText' ]),
268 # XXX: no channel name
270 elif ctype
== "PLAYLIST" :
271 return { 'type' : ctype
, 'content' : {
272 'playlist_id' : card
[ 'endpoint' ][ 'watchEndpoint' ][ 'playlistId' ],
273 'video_id' : card
[ 'endpoint' ][ 'watchEndpoint' ][ 'videoId' ],
274 'title' : card
[ 'title' ][ 'simpleText' ],
275 'author' : delL ( card
[ 'metadata' ][ 'simpleText' ]),
276 'n_videos' : toInt ( card
[ 'playlistLength' ][ 'simpleText' ]),
278 elif ctype
== "WEBSITE" or ctype
== "CREATOR_MERCHANDISE" :
279 url
= clean_url ( card
[ 'endpoint' ][ 'urlEndpoint' ][ 'url' ])
280 return { 'type' : "WEBSITE" , 'content' : {
282 'domain' : urlparse ( url
). netloc
,
283 'title' : card
[ 'title' ][ 'simpleText' ],
284 'icons' : mkthumbs ( card
[ 'image' ][ 'thumbnails' ]),
287 log_unknown_card ( card
)
290 def parse_channel_items ( items
, channel_id
, author
):
294 key
= next ( iter ( item
. keys ()), None )
296 if key
in [ "gridVideoRenderer" , "videoRenderer" , "videoCardRenderer" ]:
297 # only videoCardRenderer (topic channels) has author and channel, others fall back to supplied ones.
298 result
. append ({ 'type' : 'VIDEO' , 'content' : {
299 'video_id' : content
[ 'videoId' ],
300 'title' : content
[ 'title' ]. get ( 'simpleText' ) or content
[ 'title' ]. get ( 'runs' ,[{}])[ 0 ]. get ( 'text' ),
301 'author' : content
. get ( 'bylineText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'text' ) or author
,
302 'channel_id' : content
. get ( 'bylineText' ,{}). get ( 'runs' ,[{}])[ 0 ] \
303 . get ( 'navigationEndpoint' ,{}). get ( 'browseEndpoint' ,{}). get ( 'browseId' ) or channel_id
,
304 'length' : ( content
. get ( 'lengthText' ,{}). get ( 'simpleText' ) or # topic channel
305 listfind ( content
. get ( 'thumbnailOverlays' ,[]), 'thumbnailOverlayTimeStatusRenderer' )
306 . get ( 'text' ,{}). get ( 'simpleText' )),
307 # topic channel: .metadataText.simpleText = "22M views \u00b7 2 months ago"
308 'views' : toInt ( content
. get ( 'viewCountText' ,{}). get ( 'simpleText' )),
309 'published' : age ( content
. get ( 'publishedTimeText' ,{}). get ( 'simpleText' )),
311 elif key
== "gridPlaylistRenderer" or key
== "playlistRenderer" :
312 result
. append ({ 'type' : 'PLAYLIST' , 'content' : {
313 'playlist_id' : content
[ 'navigationEndpoint' ]. get ( 'watchEndpoint' ,{}). get ( 'playlistId' ) or content
. get ( 'playlistId' ),
314 'video_id' : content
[ 'navigationEndpoint' ]. get ( 'watchEndpoint' ,{}). get ( 'videoId' ,{}),
315 'title' : ( content
[ 'title' ]. get ( 'simpleText' ) or # playlistRenderer
316 content
[ 'title' ][ 'runs' ][ 0 ][ 'text' ]), # gridPlaylistRenderer
318 'channel_id' : channel_id
,
319 'n_videos' : toInt ( content
. get ( 'videoCount' ) or # playlistRenderer
320 content
. get ( 'videoCountShortText' ,{}). get ( 'simpleText' ) or # grid(1)
321 content
. get ( 'videoCountText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'text' )), # grid(2)
323 elif key
in [ "itemSectionRenderer" , "gridRenderer" , "horizontalCardListRenderer" ]:
325 "itemSectionRenderer" : 'contents' ,
326 "gridRenderer" : 'items' ,
327 "horizontalCardListRenderer" : 'cards' ,
329 r
, e
= parse_channel_items ( content
[ newkey
], channel_id
, author
)
332 elif key
== "shelfRenderer" :
333 r
, e
= parse_channel_items ([ content
[ 'content' ]], channel_id
, author
)
336 elif key
== "messageRenderer" :
337 # e.g. {'messageRenderer': {'text': {'runs': [{'text': 'This channel has no playlists.'}]}}}
339 elif key
== "gameCardRenderer" :
342 log_unknown_card ( item
)
346 def parse_playlist ( item
):
347 key
= next ( iter ( item
. keys ()), None )
349 if key
== "playlistVideoRenderer" :
350 if not content
. get ( 'isPlayable' , False ):
351 return None # private or deleted video
353 return { 'type' : 'VIDEO' , 'content' : {
354 'video_id' : content
[ 'videoId' ],
355 'title' : ( content
[ 'title' ]. get ( 'simpleText' ) or # playable videos
356 content
[ 'title' ]. get ( 'runs' ,[{}])[ 0 ]. get ( 'text' )), # "[Private video]"
357 'playlist_id' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ][ 'playlistId' ],
358 'index' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ]. get ( 'index' , 0 ), #or int(content['index']['simpleText']) (absent on course intros; e.g. PL96C35uN7xGJu6skU4TBYrIWxggkZBrF5)
359 # rest is missing from unplayable videos:
360 'author' : content
. get ( 'shortBylineText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'text' ),
361 'channel_id' : content
. get ( 'shortBylineText' ,{}). get ( 'runs' ,[{}])[ 0 ]. get ( 'navigationEndpoint' ,{}). get ( 'browseEndpoint' ,{}). get ( 'browseId' ),
362 'length' : ( content
. get ( "lengthText" ,{}). get ( "simpleText" ) or # "8:51"
363 int ( content
. get ( "lengthSeconds" , 0 ))), # "531"
364 'starttime' : content
[ 'navigationEndpoint' ][ 'watchEndpoint' ]. get ( 'startTimeSeconds' ),
367 raise Exception ( item
) # XXX TODO