]> git.gir.st - subscriptionfeed.git/blob - app/common.py
move flask secret_key to config.ini
[subscriptionfeed.git] / app / common.py
1 import os
2 import re
3 import json
4 import requests
5 import requests_cache
6 import dateutil.parser
7 from xml.etree import ElementTree
8 from configparser import ConfigParser
9 from datetime import datetime, timezone
10 from urllib.parse import parse_qs, urlparse
11
12 cf = ConfigParser()
13 config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
14 cf.read(config_filename)
15
16 # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc
17 requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,))
18
19 # Note: this should only be required for the 'memory' backed cache.
20 # TODO: only run for long-running processes, i.e. the frontend
21 from threading import Timer
22 def purge_cache(sec):
23 requests_cache.remove_expired_responses()
24 t = Timer(sec, purge_cache, args=(sec,))
25 t.setDaemon(True)
26 t.start()
27 purge_cache(10*60)
28
29 def fetch_xml(feed_type, feed_id):
30 r = requests.get(f"https://www.youtube.com/feeds/videos.xml?{feed_type}={feed_id}")
31 if not r.ok:
32 return None
33
34 return r.text
35
36 def parse_xml(xmldata):
37 ns = {
38 'atom':"http://www.w3.org/2005/Atom",
39 'yt': "http://www.youtube.com/xml/schemas/2015",
40 'media':"http://search.yahoo.com/mrss/"
41 }
42
43 feed = ElementTree.fromstring(xmldata)
44 title = feed.find('atom:title',ns).text
45 author = feed.find('atom:author/atom:name',ns).text \
46 if feed.find('atom:author',ns) else None
47 videos = []
48 for entry in feed.findall('atom:entry',ns):
49 videos.append({
50 'video_id': entry.find('yt:videoId',ns).text,
51 'title': entry.find('atom:title',ns).text,
52 'published': entry.find('atom:published',ns).text,
53 'channel_id': entry.find('yt:channelId',ns).text,
54 'author': entry.find('atom:author',ns).find('atom:name',ns).text,
55 # extra fields for pull_subs/webhook:
56 'updated': entry.find('atom:updated',ns).text,
57 })
58
59 return title, author, videos
60
61 def update_channel(db, xmldata):
62 if not xmldata: return False
63
64 # Note: websub does not return global author, hence taking from first video
65 title, _, videos = parse_xml(xmldata)
66
67 c = db.cursor()
68 for i, video in enumerate(videos):
69 now = datetime.now(timezone.utc)
70 updated = dateutil.parser.parse(video['updated'])
71 published = dateutil.parser.parse(video['published'])
72 # if update and published time are near-identical, we assume it's new.
73 if (updated - published).seconds < 60 and (now - published).days < 7:
74 timestamp = now
75 else:#, it's just an update to an older video.
76 timestamp = published
77
78 c.execute("""
79 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
80 VALUES (?, ?, ?, datetime(?), datetime(?))
81 """, (
82 video['video_id'],
83 video['channel_id'],
84 video['title'],
85 video['published'],
86 timestamp
87 ))
88
89 if i == 0: # only required once per feed
90 c.execute("""
91 INSERT OR REPLACE INTO channels (id, name)
92 VALUES (?, ?)
93 """, (video['channel_id'], video['author']))
94 db.commit()
95
96 return True
97
98 def get_video_info(video_id, sts=0, algo=""):
99 """
100 returns: best-quality muxed video stream, player_response, error-type/mesage
101 error types: player, malformed, livestream, geolocked, exhausted
102 """
103 player_error = None # for 'exhausted'
104 for el in ['embedded', 'detailpage']:#sometimes, only one or the other works
105 r = requests.get(f"https://www.youtube.com/get_video_info"+
106 f"?video_id={video_id}"+
107 f"&eurl=https://youtube.googleapis.com/v/{video_id}"+
108 f"&el={el}"+
109 f"&sts={sts}"+
110 f"&hl=en_US") #"&hl=en&gl=US"
111 params = parse_qs(r.text)
112 if 'errorcode' in params: # status=fail
113 return None, None, 'malformed', params['reason'][0]
114
115 metadata = json.loads(params.get('player_response')[0])
116 playabilityStatus = metadata['playabilityStatus']['status']
117 if playabilityStatus != "OK":
118 playabilityReason = metadata['playabilityStatus']['reason']
119 player_error = f"{playabilityStatus}: {playabilityReason}"
120 if playabilityStatus == "UNPLAYABLE":
121 continue # try again with next el value (or fail as exhausted)
122 # without videoDetails, there's only the error message
123 maybe_metadata = metadata if 'videoDetails' in metadata else None
124 return None, maybe_metadata, 'player', player_error
125 if 'liveStreamability' in metadata['playabilityStatus']:
126 # can also check .microformat.liveBroadcastDetails.isLiveNow
127 return None, metadata, 'livestream', None
128
129 if not 'formats' in metadata['streamingData']:
130 #TODO: hls only video with those params (kAZCrtJJaAo):
131 # "videoDetails": {
132 # "isLiveDefaultBroadcast": true,
133 # "isLowLatencyLiveStream": true,
134 # "isLiveContent": true,
135 # "isPostLiveDvr": true
136 continue # no urls
137
138 formats = metadata['streamingData']['formats']
139 for (i,v) in enumerate(formats):
140 if not ('cipher' in v or 'signatureCipher' in v): continue
141 cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
142 formats[i]['url'] = unscramble(cipher, algo)
143
144 # todo: check if we have urls or try again
145 url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
146
147 if 'gcr' in parse_qs(url):
148 return None, metadata, 'geolocked', None
149
150 return url, metadata, None, None
151 else:
152 return None, metadata, 'exhausted', player_error
153
154 def unscramble(cipher, algo): # test video id: UxxajLWwzqY
155 signature = list(cipher['s'][0])
156 for c in algo.split():
157 op, ix = re.match(r"([rsw])(\d+)?", c).groups()
158 ix = int(ix) % len(signature) if ix else 0
159 if not op: continue
160 if op == 'r': signature = list(reversed(signature))
161 if op == 's': signature = signature[ix:]
162 if op == 'w': signature[0], signature[ix] = signature[ix], signature[0]
163 sp = cipher.get('sp', ['signature'])[0]
164 sig = cipher.get('sig', [''.join(signature)])[0]
165 return f"{cipher['url'][0]}&{sp}={sig}"
166
167 def prepare_metadata(metadata):
168 meta1 = metadata['videoDetails']
169 meta2 = metadata['microformat']['playerMicroformatRenderer']
170 cards = metadata['cards']['cardCollectionRenderer']['cards'] \
171 if 'cards' in metadata else []
172 endsc = metadata['endscreen']['endscreenRenderer']['elements'] \
173 if 'endscreen' in metadata else []
174
175 # thumbnails are either 4:3 or 16:9
176 some_img = meta2['thumbnail']['thumbnails'][0]
177 aspect_ratio = some_img['width'] / some_img['height']
178 # the actual video streams have exact information:
179 if 'streamingData' in metadata:
180 sd = metadata['streamingData']
181 some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0]
182 aspect_ratio = some_stream['width'] / some_stream['height']
183
184 subtitles = sorted([
185 {'url':cc['baseUrl'],
186 'code':cc['languageCode'],
187 'autogenerated':cc.get('kind')=="asr",
188 'name':cc['name']['simpleText']}
189 for cc in metadata.get('captions',{})
190 .get('playerCaptionsTracklistRenderer',{})
191 .get('captionTracks',[])
192 ], key=lambda cc: cc['autogenerated'])
193
194 def clean_url(url):
195 # externals URLs are redirected through youtube.com/redirect, but we
196 # may encounter internal URLs, too
197 return parse_qs(urlparse(url).query).get('q',[url])[0]
198 # Remove left-/rightmost word from string:
199 delL = lambda s: s.partition(' ')[2]
200 delR = lambda s: s.rpartition(' ')[0]
201 # Thousands seperator aware int():
202 intT = lambda s: int(s.replace(',', ''))
203
204 def parse_infocard(card):
205 card = card['cardRenderer']
206 ctype = list(card['content'].keys())[0]
207 content = card['content'][ctype]
208 if ctype == "pollRenderer":
209 ctype = "POLL"
210 content = {
211 'question': content['question']['simpleText'],
212 'answers': [(a['text']['simpleText'],a['numVotes']) \
213 for a in content['choices']],
214 }
215 elif ctype == "videoInfoCardContentRenderer":
216 ctype = "VIDEO"
217 content = {
218 'video_id': content['action']['watchEndpoint']['videoId'],
219 'title': content['videoTitle']['simpleText'],
220 'author': delL(content['channelName']['simpleText']),
221 'length': content['lengthString']['simpleText'], # '23:03'
222 'views': intT(delR(content['viewCountText']['simpleText'])),
223 }
224 elif ctype == "playlistInfoCardContentRenderer":
225 ctype = "PLAYLIST"
226 content = {
227 'playlist_id': content['action']['watchEndpoint']['playlistId'],
228 'video_id': content['action']['watchEndpoint']['videoId'],
229 'title': content['playlistTitle']['simpleText'],
230 'author': delL(content['channelName']['simpleText']),
231 'n_videos': intT(content['playlistVideoCount']['simpleText']),
232 }
233 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']:
234 ctype = "WEBSITE"
235 content = {
236 'url': clean_url(content['command']['urlEndpoint']['url']),
237 'domain': content['displayDomain']['simpleText'],
238 'title': content['title']['simpleText'],
239 # XXX: no thumbnails for infocards
240 }
241 else:
242 import pprint
243 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
244
245 return {'type': ctype, 'content': content}
246
247 def mkthumbs(thumbs):
248 return {e['height']: e['url'] for e in thumbs}
249 def parse_endcard(card):
250 card = card.get('endscreenElementRenderer', card) #only sometimes nested
251 ctype = card['style']
252 if ctype == "CHANNEL":
253 content = {
254 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
255 'title': card['title']['simpleText'],
256 'icons': mkthumbs(card['image']['thumbnails']),
257 }
258 elif ctype == "VIDEO":
259 content = {
260 'video_id': card['endpoint']['watchEndpoint']['videoId'],
261 'title': card['title']['simpleText'],
262 'length': card['videoDuration']['simpleText'], # '12:21'
263 'views': delR(card['metadata']['simpleText']),
264 # XXX: no channel name
265 }
266 elif ctype == "PLAYLIST":
267 content = {
268 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
269 'video_id': card['endpoint']['watchEndpoint']['videoId'],
270 'title': card['title']['simpleText'],
271 'author': delL(card['metadata']['simpleText']),
272 'n_videos': intT(delR(card['playlistLength']['simpleText'])),
273 }
274 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
275 ctype = "WEBSITE"
276 url = clean_url(card['endpoint']['urlEndpoint']['url'])
277 content = {
278 'url': url,
279 'domain': urlparse(url).netloc,
280 'title': card['title']['simpleText'],
281 'icons': mkthumbs(card['image']['thumbnails']),
282 }
283 else:
284 import pprint
285 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
286
287 return {'type': ctype, 'content': content}
288
289 infocards = [parse_infocard(card) for card in cards]
290 endcards = [parse_endcard(card) for card in endsc]
291 # combine cards to weed out duplicates. for videos and playlists prefer
292 # infocards, for channels and websites prefer endcards, as those have more
293 # information than the other.
294 ident = { # ctype -> ident
295 'VIDEO': 'video_id',
296 'PLAYLIST': 'playlist_id',
297 'CHANNEL': 'channel_id',
298 'WEBSITE': 'url',
299 'POLL': 'question',
300 }
301 getident = lambda c: c['content'][ident.get(c['type'])]
302 mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types]
303 exclude = lambda cards, without: [c for c in cards if getident(c) not in without]
304
305 allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \
306 exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST']))
307
308 all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD
309 BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH
310 CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER
311 ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT
312 GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE
313 KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD
314 ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF
315 NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA
316 RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX
317 SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ
318 VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split()
319 whitelisted = sorted(meta2['availableCountries'])
320 blacklisted = sorted(set(all_countries) - set(whitelisted))
321
322 return {
323 'title': meta1['title'],
324 'author': meta1['author'],
325 'channel_id': meta1['channelId'],
326 'description': meta1['shortDescription'],
327 'published': meta2['publishDate'],
328 'views': meta1['viewCount'],
329 'length': int(meta1['lengthSeconds']),
330 'rating': meta1['averageRating'],
331 'category': meta2['category'],
332 'aspectr': aspect_ratio,
333 'unlisted': meta2['isUnlisted'],
334 'countries': whitelisted,
335 'blacklisted': blacklisted,
336 'poster': meta2['thumbnail']['thumbnails'][0]['url'],
337 'infocards': infocards,
338 'endcards': endcards,
339 'all_cards': allcards,
340 'subtitles': subtitles,
341 }
342
343 def pp(*args):
344 from pprint import pprint
345 import sys, codecs
346 pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))
Imprint / Impressum