]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
search results: fix channel without subscriber count, add spelling info
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4
5 def findall(obj, key):
6 """
7 given a list of dicts, where one dict contains a given key, return said key.
8 """
9 return [ obj[key] for obj in obj if key in obj.keys() ]
10 def listget(obj, index, fallback=None):
11 return next(iter(obj[index:]), fallback)
12 flatten = lambda l: [item for sublist in l for item in sublist] # https://stackoverflow.com/a/952952
13 first = lambda l: next(iter(l),{})
14 listfind = lambda obj,key: first(findall(obj,key))
15
16 def prepare_searchresults(yt_results):
17 contents = listfind(yt_results, 'response') \
18 .get('contents',{})\
19 .get('twoColumnSearchResultsRenderer',{})\
20 .get('primaryContents',{})\
21 .get('sectionListRenderer',{})\
22 .get('contents',[])
23 contents = flatten([c.get('contents',[]) for c in findall(contents, 'itemSectionRenderer')])
24
25 return parse_result_items(contents)
26
27 def prepare_infocards(metadata):
28 cards = metadata.get('cards',{}).get('cardCollectionRenderer',{}).get('cards',[])
29 return [parse_infocard(card) for card in cards if card is not None]
30
31 def prepare_endcards(metadata):
32 endsc = metadata.get('endscreen',{}).get('endscreenRenderer',{}).get('elements',[])
33 return [parse_endcard(card) for card in endsc if card is not None]
34
35 def mkthumbs(thumbs):
36 output = {str(e['height']): e['url'] for e in thumbs}
37 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
38 return {**output, 'largest': largest}
39
40 def clean_url(url):
41 # externals URLs are redirected through youtube.com/redirect, but we
42 # may encounter internal URLs, too
43 return parse_qs(urlparse(url).query).get('q',[url])[0]
44
45 def toInt(s, fallback=0):
46 if s is None:
47 return fallback
48 try:
49 return int(''.join(filter(str.isdigit, s)))
50 except ValueError:
51 return fallback
52
53 # Remove left-/rightmost word from string:
54 delL = lambda s: s.partition(' ')[2]
55
56 def age(s):
57 if s is None: # missing from autogen'd music, some livestreams
58 return None
59 # Some livestreams have "Streamed 7 hours ago"
60 s = s.replace("Streamed ","")
61 # Now, everything should be in the form "1 year ago"
62 value, unit, _ = s.split(" ")
63 suffix = dict(
64 month='mn',
65 months='mn',
66 ).get(unit, unit[0]) # first letter otherwise (e.g. year(s) => y)
67
68 return f"{value}{suffix}"
69
70 def log_unknown_card(data):
71 import json
72 try:
73 from flask import request
74 source = request.url
75 except: source = "unknown"
76 with open("/tmp/innertube.err", "a") as f:
77 f.write(f"\n/***** {source} *****/\n")
78 json.dump(data, f, indent=2)
79
80 def parse_result_items(items):
81 # TODO: use .get() for most non-essential attributes
82 """
83 parses youtube search response into an easier to use format.
84 """
85 results = []
86 extras = []
87 for item in items:
88 key = next(iter(item.keys()), None)
89 content = item[key]
90 if key == 'videoRenderer':
91 is_live = listfind(content.get('badges',[]), 'metadataBadgeRenderer').get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
92 results.append({'type': 'VIDEO', 'content': {
93 'video_id': content['videoId'],
94 'title': content['title']['runs'][0]['text'],
95 'author': content['longBylineText']['runs'][0]['text'] or \
96 content['shortBylineText']['runs'][0]['text'],
97 'channel_id': content['ownerText']['runs'][0] \
98 ['navigationEndpoint']['browseEndpoint']['browseId'],
99 'length': content.get('lengthText',{}).get('simpleText') \
100 if not is_live else 'LIVE', # "44:07", "1:41:50"
101 'views': toInt(content.get('viewCountText',{}).get('simpleText') or # "123,456 views"
102 listget(content.get('viewCountText',{}).get('runs'),0,{}).get('text')), # "1,234 watching"
103 'published': age(content.get('publishedTimeText',{}).get('simpleText')),
104 }})
105 elif key == 'playlistRenderer':
106 results.append({'type': 'PLAYLIST', 'content': {
107 'playlist_id': content['navigationEndpoint']['watchEndpoint']['playlistId'],
108 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
109 'title': content['title']['simpleText'],
110 'author': content['longBylineText']['runs'][0]['text'] or
111 content['shortBylineText']['runs'][0]['text'],
112 'channel_id': content['longBylineText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR .shortBylineText
113 'n_videos': toInt(content['videoCount']),
114 }})
115 elif key == 'radioRenderer': # "Mix" playlists
116 results.append({'type': 'PLAYLIST', 'content': {
117 'playlist_id': content['playlistId'],
118 'video_id': content['navigationEndpoint']['watchEndpoint']['videoId'],
119 'title': content['title']['simpleText'],
120 'author': content['longBylineText']['simpleText'] or \
121 content['shortBylineText']['simpleText'] , # always "YouTube"
122 'channel_id': None,
123 'n_videos': content['videoCountShortText']['runs'][0]['text'] or \
124 content['videoCountText']['runs'][0]['text'],
125 # videoCountShortText: "50+"; videoCountText: "50+ videos"
126 }})
127 elif key == 'channelRenderer':
128 results.append({'type': 'CHANNEL', 'content': {
129 'channel_id': content['channelId'],
130 'title': content['title']['simpleText'],
131 'icons': mkthumbs(content['thumbnail']['thumbnails']),
132 'subscribers': content.get('subscriberCountText',{}).get('simpleText'), # "2.47K subscribers"
133 }})
134 elif key == 'shelfRenderer':
135 r, e = parse_result_items(content['content']['verticalListRenderer']['items'])
136 results.extend(r)
137 extras.extend(e)
138 elif key == 'movieRenderer': # movies to buy/rent
139 pass
140 elif key == 'carouselAdRenderer' or key == 'searchPyvRenderer': # haha, no.
141 pass
142 elif key == 'horizontalCardListRenderer':
143 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
144 pass
145 elif key == 'emergencyOneboxRenderer': # suicide prevention hotline
146 pass
147 elif key == 'didYouMeanRenderer' or key == 'showingResultsForRenderer':
148 extras.append({
149 'type': 'spelling',
150 'query': content['correctedQueryEndpoint']['searchEndpoint']['query'], # non-misspelled query
151 'autocorrected': key == 'showingResultsForRenderer',
152 })
153 else:
154 log_unknown_card(item)
155 return results, extras
156
157 def parse_infocard(card):
158 """
159 parses a single infocard into a format that's easier to handle.
160 """
161 card = card['cardRenderer']
162 ctype = list(card['content'].keys())[0]
163 content = card['content'][ctype]
164 if ctype == "pollRenderer":
165 return {'type': "POLL", 'content': {
166 'question': content['question']['simpleText'],
167 'answers': [(a['text']['simpleText'],a['numVotes']) \
168 for a in content['choices']],
169 }}
170 elif ctype == "videoInfoCardContentRenderer":
171 is_live = content.get('badge',{}).get('liveBadgeRenderer') is not None
172 return {'type': "VIDEO", 'content': {
173 'video_id': content['action']['watchEndpoint']['videoId'],
174 'title': content['videoTitle']['simpleText'],
175 'author': delL(content['channelName']['simpleText']),
176 'length': content.get('lengthString',{}).get('simpleText') \
177 if not is_live else "LIVE", # "23:03"
178 'views': toInt(content.get('viewCountText',{}).get('simpleText')),
179 # XXX: views sometimes "Starts: July 31, 2020 at 1:30 PM"
180 }}
181 elif ctype == "playlistInfoCardContentRenderer":
182 return {'type': "PLAYLIST", 'content': {
183 'playlist_id': content['action']['watchEndpoint']['playlistId'],
184 'video_id': content['action']['watchEndpoint']['videoId'],
185 'title': content['playlistTitle']['simpleText'],
186 'author': delL(content['channelName']['simpleText']),
187 'n_videos': toInt(content['playlistVideoCount']['simpleText']),
188 }}
189 elif ctype == "simpleCardContentRenderer" and \
190 'urlEndpoint' in content['command']:
191 return {'type': "WEBSITE", 'content': {
192 'url': clean_url(content['command']['urlEndpoint']['url']),
193 'domain': content['displayDomain']['simpleText'],
194 'title': content['title']['simpleText'],
195 # XXX: no thumbnails for infocards
196 }}
197 elif ctype == "collaboratorInfoCardContentRenderer":
198 return {'type': "CHANNEL", 'content': {
199 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
200 'title': content['channelName']['simpleText'],
201 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
202 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
203 }}
204 else:
205 log_unknown_card(card)
206 return None
207
208 def parse_endcard(card):
209 """
210 parses a single endcard into a format that's easier to handle.
211 """
212 card = card.get('endscreenElementRenderer', card) #only sometimes nested
213 ctype = card['style']
214 if ctype == "CHANNEL":
215 return {'type': ctype, 'content': {
216 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
217 'title': card['title']['simpleText'],
218 'icons': mkthumbs(card['image']['thumbnails']),
219 }}
220 elif ctype == "VIDEO":
221 return {'type': ctype, 'content': {
222 'video_id': card['endpoint']['watchEndpoint']['videoId'],
223 'title': card['title']['simpleText'],
224 'length': card['videoDuration']['simpleText'], # '12:21'
225 'views': toInt(card['metadata']['simpleText']),
226 # XXX: no channel name
227 }}
228 elif ctype == "PLAYLIST":
229 return {'type': ctype, 'content': {
230 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
231 'video_id': card['endpoint']['watchEndpoint']['videoId'],
232 'title': card['title']['simpleText'],
233 'author': delL(card['metadata']['simpleText']),
234 'n_videos': toInt(card['playlistLength']['simpleText']),
235 }}
236 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
237 url = clean_url(card['endpoint']['urlEndpoint']['url'])
238 return {'type': "WEBSITE", 'content': {
239 'url': url,
240 'domain': urlparse(url).netloc,
241 'title': card['title']['simpleText'],
242 'icons': mkthumbs(card['image']['thumbnails']),
243 }}
244 else:
245 log_unknown_card(card)
246 return None
Imprint / Impressum