]> git.gir.st - subscriptionfeed.git/blob - app/common/innertube.py
move innertube parsing into its own package, add searchresult parser
[subscriptionfeed.git] / app / common / innertube.py
1 # functions that deal with parsing data from youtube's internal API ("innertube")
2
3 from urllib.parse import parse_qs, urlparse
4
5 def mkthumbs(thumbs):
6 output = {str(e['height']): e['url'] for e in thumbs}
7 largest=next(iter(sorted(output.keys(),reverse=True,key=int)),None)
8 return {**output, 'largest': largest}
9
10 def clean_url(url):
11 # externals URLs are redirected through youtube.com/redirect, but we
12 # may encounter internal URLs, too
13 return parse_qs(urlparse(url).query).get('q',[url])[0]
14
15 # Remove left-/rightmost word from string:
16 delL = lambda s: s.partition(' ')[2]
17 delR = lambda s: s.rpartition(' ')[0]
18 # Thousands seperator aware int():
19 intT = lambda s: int(s.replace(',', ''))
20
21 def parse_result_items(items):
22 """
23 parses youtube search response into an easier to use format.
24 """
25 results = []
26 for item in items:
27 key = next(iter(item.keys()), None)
28 if key == 'videoRenderer':
29 is_live = next(iter([badge['metadataBadgeRenderer'] for badge in item[key].get('badges',[]) if 'metadataBadgeRenderer' in badge.keys()]),{}).get('style') == 'BADGE_STYLE_TYPE_LIVE_NOW'
30 results.append(
31 {'type': 'VIDEO', 'content': {
32 'video_id': item[key]['videoId'],
33 'title': item[key]['title']['runs'][0]['text'], # XXX: handle/concat multiple runs?
34 'author': item[key]['longBylineText']['runs'][0]['text'], # OR: ownerText (never works), shortBylineText
35 'channel_id': item[key]['ownerText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR: channelThumbnailSupportedRenderers.channelThumbnailWithLinkRenderer.navigationEndpoint.browseId
36 'length': item[key].get('lengthText',{}).get('simpleText') if not is_live else 'LIVE', # "44:07", "1:41:50" -- XXX: maybe absent--when?
37 'views': item[key].get('viewCountText',{}).get('simpleText'), # XXX: "123,456 views", absent on livestreams
38 # published: e.g. "1 year ago"; missing on autogenerated
39 # music 'videos', livestreams sometimes "Streamed 7 hours
40 # ago", sometimes absent.
41 'published': item[key].get('publishedTimeText',{}).get('simpleText',"").replace("Streamed ",""),
42 }}
43 )
44 elif key == 'playlistRenderer':
45 results.append(
46 {'type': 'PLAYLIST', 'content': {
47 'playlist_id': item[key]['navigationEndpoint']['watchEndpoint']['playlistId'],
48 'video_id': item[key]['navigationEndpoint']['watchEndpoint']['videoId'],
49 'title': item[key]['title']['simpleText'],
50 'author': item[key]['longBylineText']['runs'][0]['text'], # OR: .shortBylineText
51 'channel_id': item[key]['longBylineText']['runs'][0]['navigationEndpoint']['browseEndpoint']['browseId'], # OR .shortBylineText
52 'n_videos': item[key]['videoCount'],
53 }}
54 )
55 elif key == 'radioRenderer':
56 # "Mix" playlists
57 results.append(
58 {'type': 'PLAYLIST', 'content': {
59 'playlist_id': item[key]['playlistId'], # OR: same as normal playlist
60 'video_id': item[key]['navigationEndpoint']['watchEndpoint']['videoId'],
61 'title': item[key]['title']['simpleText'],
62 'author': item[key]['longBylineText']['simpleText'], # always "YouTube"; OR: .shortBylineText
63 'channel_id': None, # xxx: nothing available
64 #'n_videos': item[key]['videoCountText']['runs'][0]['text'], # XXX: "50+ videos"
65 'n_videos': item[key]['videoCountShortText']['runs'][0]['text'], # "50+"
66 }}
67 )
68 elif key == 'channelRenderer':
69 results.append(
70 {'type': 'CHANNEL', 'content': {
71 'channel_id': item[key]['channelId'],
72 'title': item[key]['title']['simpleText'],
73 'icons': mkthumbs(item[key]['thumbnail']['thumbnails']), # [{url,height,width}]
74 'subscribers': item[key]['subscriberCountText']['simpleText'], # XXX: "2.47K subscribers"
75 }}
76 )
77 elif key == 'shelfRenderer':
78 results.extend([
79 item for item in parse_result_items(item[key]['content']['verticalListRenderer']['items'])
80 ])
81 elif key == 'movieRenderer':
82 # movies to buy/rent
83 pass
84 elif key == 'horizontalCardListRenderer':
85 # suggested searches: .cards[].searchRefinementCardRenderer.query.runs[].text
86 pass
87 else:
88 import pprint
89 content = {'error': f"{key} is not implemented; <pre>{pprint.pformat(item)}</pre>"}
90 results.append({'type': key, 'content': content})
91 return results
92
93 def parse_infocard(card):
94 """
95 parses a single infocard into a format that's easier to handle.
96 """
97 card = card['cardRenderer']
98 ctype = list(card['content'].keys())[0]
99 content = card['content'][ctype]
100 if ctype == "pollRenderer":
101 ctype = "POLL"
102 content = {
103 'question': content['question']['simpleText'],
104 'answers': [(a['text']['simpleText'],a['numVotes']) \
105 for a in content['choices']],
106 }
107 elif ctype == "videoInfoCardContentRenderer":
108 ctype = "VIDEO"
109 # if the card references a live stream, it has no length, but a "LIVE NOW" badge.
110 # TODO: this is ugly; cleanup.
111 is_live = content.get('badge',{}).get('liveBadgeRenderer',{})
112 length = is_live.get('label',{}).get('simpleText') or content.get('lengthString',{}).get('simpleText') # '23:03'
113 from flask import current_app
114 current_app.logger.warning(content['viewCountText']['simpleText'])
115 # Starts: July 31, 2020 at 1:30 PM
116 # viewCountText.simpleText might contain ^this!
117 try:
118 view_count = intT(delR(content['viewCountText']['simpleText']))
119 except: view_count = 0
120 content = {
121 'video_id': content['action']['watchEndpoint']['videoId'],
122 'title': content['videoTitle']['simpleText'],
123 'author': delL(content['channelName']['simpleText']),
124 'length': length,
125 'views': view_count,
126 }
127 elif ctype == "playlistInfoCardContentRenderer":
128 ctype = "PLAYLIST"
129 content = {
130 'playlist_id': content['action']['watchEndpoint']['playlistId'],
131 'video_id': content['action']['watchEndpoint']['videoId'],
132 'title': content['playlistTitle']['simpleText'],
133 'author': delL(content['channelName']['simpleText']),
134 'n_videos': intT(content['playlistVideoCount']['simpleText']),
135 }
136 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']:
137 ctype = "WEBSITE"
138 content = {
139 'url': clean_url(content['command']['urlEndpoint']['url']),
140 'domain': content['displayDomain']['simpleText'],
141 'title': content['title']['simpleText'],
142 # XXX: no thumbnails for infocards
143 }
144 elif ctype == "collaboratorInfoCardContentRenderer":
145 ctype = "CHANNEL"
146 content = {
147 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
148 'title': content['channelName']['simpleText'],
149 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
150 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
151 }
152 else:
153 import pprint
154 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
155
156 return {'type': ctype, 'content': content}
157
158 def parse_endcard(card):
159 """
160 parses a single endcard into a format that's easier to handle.
161 """
162 card = card.get('endscreenElementRenderer', card) #only sometimes nested
163 ctype = card['style']
164 if ctype == "CHANNEL":
165 content = {
166 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
167 'title': card['title']['simpleText'],
168 'icons': mkthumbs(card['image']['thumbnails']),
169 }
170 elif ctype == "VIDEO":
171 content = {
172 'video_id': card['endpoint']['watchEndpoint']['videoId'], # XXX: KeyError 'endpoint' exception (no idea which youtube video this was on)
173 'title': card['title']['simpleText'],
174 'length': card['videoDuration']['simpleText'], # '12:21'
175 'views': delR(card['metadata']['simpleText']),
176 # XXX: no channel name
177 }
178 elif ctype == "PLAYLIST":
179 content = {
180 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
181 'video_id': card['endpoint']['watchEndpoint']['videoId'],
182 'title': card['title']['simpleText'],
183 'author': delL(card['metadata']['simpleText']),
184 'n_videos': intT(delR(card['playlistLength']['simpleText'])),
185 }
186 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
187 ctype = "WEBSITE"
188 url = clean_url(card['endpoint']['urlEndpoint']['url'])
189 content = {
190 'url': url,
191 'domain': urlparse(url).netloc,
192 'title': card['title']['simpleText'],
193 'icons': mkthumbs(card['image']['thumbnails']),
194 }
195 else:
196 import pprint
197 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
198
199 return {'type': ctype, 'content': content}
Imprint / Impressum