]> git.gir.st - subscriptionfeed.git/blob - app/common.py
cleanup a lot, support CREATOR_MERCHANDISE endcard (as website)
[subscriptionfeed.git] / app / common.py
1 import os
2 import re
3 import json
4 import requests
5 import requests_cache
6 import dateutil.parser
7 from xml.etree import ElementTree
8 from configparser import ConfigParser
9 from datetime import datetime, timezone
10 from urllib.parse import parse_qs, urlparse
11
12 cf = ConfigParser()
13 config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
14 cf.read(config_filename)
15
16 # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc
17 requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,))
18
19 # Note: this should only be required for the 'memory' backed cache.
20 # TODO: only run for long-running processes, i.e. the frontend
21 from threading import Timer
22 def purge_cache(sec):
23 requests_cache.remove_expired_responses()
24 t = Timer(sec, purge_cache, args=(sec,))
25 t.setDaemon(True)
26 t.start()
27 purge_cache(10*60)
28
29 def fetch_xml(feed_type, feed_id):
30 r = requests.get(f"https://www.youtube.com/feeds/videos.xml?{feed_type}={feed_id}")
31 if not r.ok:
32 return None
33
34 return r.text
35
36 def parse_xml(xmldata):
37 ns = {
38 'atom':"http://www.w3.org/2005/Atom",
39 'yt': "http://www.youtube.com/xml/schemas/2015",
40 'media':"http://search.yahoo.com/mrss/"
41 }
42
43 feed = ElementTree.fromstring(xmldata)
44 title = feed.find('atom:title',ns).text
45 author = feed.find('atom:author/atom:name',ns).text \
46 if feed.find('atom:author',ns) else None
47 videos = []
48 for entry in feed.findall('atom:entry',ns):
49 videos.append({
50 'video_id': entry.find('yt:videoId',ns).text,
51 'title': entry.find('atom:title',ns).text,
52 'published': entry.find('atom:published',ns).text,
53 'channel_id': entry.find('yt:channelId',ns).text,
54 'author': entry.find('atom:author',ns).find('atom:name',ns).text,
55 # extra fields for pull_subs/webhook:
56 'updated': entry.find('atom:updated',ns).text,
57 })
58
59 return title, author, videos
60
61 def update_channel(db, xmldata):
62 if not xmldata: return False
63
64 # Note: websub does not return global author, hence taking from first video
65 title, _, videos = parse_xml(xmldata)
66
67 c = db.cursor()
68 for i, video in enumerate(videos):
69 now = datetime.now(timezone.utc)
70 updated = dateutil.parser.parse(video['updated'])
71 published = dateutil.parser.parse(video['published'])
72 # if update and published time are near-identical, we assume it's new.
73 if (updated - published).seconds < 60 and (now - published).days < 7:
74 timestamp = now
75 else:#, it's just an update to an older video.
76 timestamp = published
77
78 c.execute("""
79 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
80 VALUES (?, ?, ?, datetime(?), datetime(?))
81 """, (
82 video['video_id'],
83 video['channel_id'],
84 video['title'],
85 video['published'],
86 timestamp
87 ))
88
89 if i == 0: # only required once per feed
90 c.execute("""
91 INSERT OR REPLACE INTO channels (id, name)
92 VALUES (?, ?)
93 """, (video['channel_id'], video['author']))
94 db.commit()
95
96 return True
97
98 def get_video_info(video_id, sts=0, algo=""):
99 """
100 returns: best-quality muxed video stream, player_response, error-type/mesage
101 error types: 'initial': the request to get_video_info was malformed
102 'player': playabilityStatus != OK
103 'internal': [livestream, geolocked, exhausted]
104 """
105 for el in ['embedded', 'detailpage']:#sometimes, only one or the other works
106 r = requests.get(f"https://www.youtube.com/get_video_info"+
107 f"?video_id={video_id}"+
108 f"&eurl=https://youtube.googleapis.com/v/{video_id}"+
109 f"&el={el}"+
110 f"&sts={sts}"+
111 f"&hl=en_US") #"&hl=en&gl=US"
112 params = parse_qs(r.text)
113 if 'errorcode' in params: # status=fail
114 return None, None, 'initial', f"MALFORMED: {params['reason'][0]}" # TODO: assuming we haven't fucked it up, this error comes up if the video id is garbage. give better error message
115
116 metadata = json.loads(params.get('player_response')[0])
117 playabilityStatus = metadata['playabilityStatus']['status']
118 if playabilityStatus != "OK":
119 if playabilityStatus == "UNPLAYABLE":
120 continue # try again with next el value (or fail as exhausted)
121 reason = metadata['playabilityStatus']['reason']
122 return None, None, 'player', f"{playabilityStatus}: {reason}"
123 if 'liveStreamability' in metadata['playabilityStatus']:
124 # can also check .microformat.liveBroadcastDetails.isLiveNow
125 return None, metadata, 'internal', "livestream"
126
127 if not 'formats' in metadata['streamingData']:
128 #TODO: hls only video with those params (kAZCrtJJaAo):
129 # "videoDetails": {
130 # "isLiveDefaultBroadcast": true,
131 # "isLowLatencyLiveStream": true,
132 # "isLiveContent": true,
133 # "isPostLiveDvr": true
134 continue # no urls
135
136 formats = metadata['streamingData']['formats']
137 for (i,v) in enumerate(formats):
138 if not ('cipher' in v or 'signatureCipher' in v): continue
139 cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
140 formats[i]['url'] = unscramble(cipher, algo)
141
142 # todo: check if we have urls or try again
143 url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
144
145 if 'gcr' in parse_qs(url):
146 return None, metadata, 'internal', "geolocked"
147
148 return url, metadata, None, None
149 else:
150 return None, metadata, 'internal', "exhausted"
151
152 def unscramble(cipher, algo): # test video id: UxxajLWwzqY
153 signature = list(cipher['s'][0])
154 for c in algo.split():
155 op, ix = re.match(r"([rsw])(\d+)?", c).groups()
156 ix = int(ix) % len(signature) if ix else 0
157 if not op: continue
158 if op == 'r': signature = list(reversed(signature))
159 if op == 's': signature = signature[ix:]
160 if op == 'w': signature[0], signature[ix] = signature[ix], signature[0]
161 sp = cipher.get('sp', ['signature'])[0]
162 sig = cipher.get('sig', [''.join(signature)])[0]
163 return f"{cipher['url'][0]}&{sp}={sig}"
164
165 def prepare_metadata(metadata):
166 meta1 = metadata['videoDetails']
167 meta2 = metadata['microformat']['playerMicroformatRenderer']
168 cards = metadata['cards']['cardCollectionRenderer']['cards'] \
169 if 'cards' in metadata else []
170 endsc = metadata['endscreen']['endscreenRenderer']['elements'] \
171 if 'endscreen' in metadata else []
172
173 # TODO: wrong on non-4:3 and non-16:9 videos! (e.g. l06PlYNShpQ)
174 #aspect_ratio = meta2['embed']['width'] / meta2['embed']['height'], # sometimes absent
175 aspect_ratio = meta2['thumbnail']['thumbnails'][0]['width'] / meta2['thumbnail']['thumbnails'][0]['height']
176
177 subtitles = sorted([
178 {'url':cc['baseUrl'],
179 'code':cc['languageCode'],
180 'autogenerated':cc.get('kind')=="asr",
181 'name':cc['name']['simpleText']}
182 for cc in metadata['captions']['playerCaptionsTracklistRenderer']['captionTracks']
183 ], key=lambda cc: cc['autogenerated']) if 'captions' in metadata and 'captionTracks' in metadata['captions']['playerCaptionsTracklistRenderer'] else [] # TODO<,^: cleanup
184
185 def clean_url(url):
186 # externals URLs are redirected through youtube.com/redirect, but we
187 # may encounter internal URLs, too
188 url = parse_qs(urlparse(url).query).get('q',[url])[0]
189 # Remove left-/rightmost word from string:
190 delL = lambda s: s.partition(' ')[2]
191 delR = lambda s: s.rpartition(' ')[0]
192 # Thousands seperator aware int():
193 intT = lambda s: int(s.replace(',', ''))
194
195 def parse_infocard(card):
196 card = card['cardRenderer']
197 ctype = list(card['content'].keys())[0]
198 content = card['content'][ctype]
199 if ctype == "pollRenderer":
200 ctype = "POLL"
201 content = {
202 'question': content['question']['simpleText'],
203 'answers': [(a['text']['simpleText'],a['numVotes']) \
204 for a in content['choices']],
205 }
206 elif ctype == "videoInfoCardContentRenderer":
207 ctype = "VIDEO"
208 content = {
209 'video_id': content['action']['watchEndpoint']['videoId'],
210 'title': content['videoTitle']['simpleText'],
211 'author': delL(content['channelName']['simpleText']),
212 'length': content['lengthString']['simpleText'], # '23:03'
213 'views': intT(delR(content['viewCountText']['simpleText'])),
214 }
215 elif ctype == "playlistInfoCardContentRenderer":
216 ctype = "PLAYLIST"
217 content = {
218 'playlist_id': content['action']['watchEndpoint']['playlistId'],
219 'video_id': content['action']['watchEndpoint']['videoId'],
220 'title': content['playlistTitle']['simpleText'],
221 'author': delL(content['channelName']['simpleText']),
222 'n_videos': intT(content['playlistVideoCount']['simpleText']),
223 }
224 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content.get('command',{}).keys(): # <TODO: cleanup
225 ctype = "WEBSITE"
226 content = {
227 'url': clean_url(content['command']['urlEndpoint']['url']),
228 'domain': content['displayDomain']['simpleText'],
229 'title': content['title']['simpleText'],
230 # XXX: no thumbnails for infocards
231 }
232 else:
233 import pprint
234 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
235
236 return {'type': ctype, 'content': content}
237
238 def mkthumbs(thumbs):
239 return {e['height']: e['url'] for e in thumbs}
240 def parse_endcard(card):
241 card = card.get('endscreenElementRenderer', card) #only sometimes nested
242 ctype = card['style']
243 if ctype == "CHANNEL":
244 content = {
245 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
246 'title': card['title']['simpleText'],
247 'icons': mkthumbs(card['image']['thumbnails']),
248 }
249 elif ctype == "VIDEO":
250 content = {
251 'video_id': card['endpoint']['watchEndpoint']['videoId'],
252 'title': card['title']['simpleText'],
253 'length': card['videoDuration']['simpleText'], # '12:21'
254 'views': delR(card['metadata']['simpleText']),
255 # XXX: no channel name
256 }
257 elif ctype == "PLAYLIST":
258 content = {
259 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
260 'video_id': card['endpoint']['watchEndpoint']['videoId'],
261 'title': card['title']['simpleText'],
262 'author': delL(card['metadata']['simpleText']),
263 'n_videos': intT(delR(card['playlistLength']['simpleText'])),
264 }
265 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
266 ctype = "WEBSITE"
267 content = {
268 'url': clean_url(card['endpoint']['urlEndpoint']['url']),
269 'domain': urlparse(url).netloc, # TODO: remove .domain
270 'title': card['title']['simpleText'],
271 'icons': mkthumbs(card['image']['thumbnails']),
272 }
273 else:
274 import pprint
275 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
276
277 return {'type': ctype, 'content': content}
278
279 return {
280 'title': meta1['title'],
281 'author': meta1['author'],
282 'channel_id': meta1['channelId'],
283 'description': meta1['shortDescription'],
284 'published': meta2['publishDate'],
285 'views': meta1['viewCount'],
286 'length': int(meta1['lengthSeconds']),
287 'rating': meta1['averageRating'],
288 'category': meta2['category'],
289 'aspectr': aspect_ratio,
290 'unlisted': meta2['isUnlisted'],
291 'countries': meta2['availableCountries'],
292 'poster': meta2['thumbnail']['thumbnails'][0]['url'],
293 'infocards': [parse_infocard(card) for card in cards],
294 'endcards': [parse_endcard(card) for card in endsc],
295 'subtitles': subtitles,
296 }
297
298 def pp(*args):
299 from pprint import pprint
300 import sys, codecs
301 pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))
Imprint / Impressum