]> git.gir.st - subscriptionfeed.git/blob - app/common/common.py
split app into blueprints - part 1
[subscriptionfeed.git] / app / common / common.py
1 import os
2 import re
3 import json
4 import requests
5 import requests_cache
6 import dateutil.parser
7 from xml.etree import ElementTree
8 from configparser import ConfigParser
9 from datetime import datetime, timezone
10 from urllib.parse import parse_qs, urlparse
11
12 cf = ConfigParser()
13 config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
14 cf.read(config_filename)
15
16 # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc
17 requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,))
18
19 # Note: this should only be required for the 'memory' backed cache.
20 # TODO: only run for long-running processes, i.e. the frontend
21 from threading import Timer
22 def purge_cache(sec):
23 requests_cache.remove_expired_responses()
24 t = Timer(sec, purge_cache, args=(sec,))
25 t.setDaemon(True)
26 t.start()
27 purge_cache(10*60)
28
29 def fetch_xml(feed_type, feed_id):
30 r = requests.get(f"https://www.youtube.com/feeds/videos.xml?{feed_type}={feed_id}")
31 if not r.ok:
32 return None
33
34 return r.text
35
36 def parse_xml(xmldata):
37 ns = {
38 'atom':"http://www.w3.org/2005/Atom",
39 'yt': "http://www.youtube.com/xml/schemas/2015",
40 'media':"http://search.yahoo.com/mrss/"
41 }
42
43 feed = ElementTree.fromstring(xmldata)
44 title = feed.find('atom:title',ns).text
45 author = feed.find('atom:author/atom:name',ns).text \
46 if feed.find('atom:author',ns) else None
47 videos = []
48 for entry in feed.findall('atom:entry',ns):
49 videos.append({
50 'video_id': entry.find('yt:videoId',ns).text,
51 'title': entry.find('atom:title',ns).text,
52 'published': entry.find('atom:published',ns).text,
53 'channel_id': entry.find('yt:channelId',ns).text,
54 'author': entry.find('atom:author',ns).find('atom:name',ns).text,
55 # extra fields for pull_subs/webhook:
56 'updated': entry.find('atom:updated',ns).text,
57 })
58
59 return title, author, videos
60
61 def update_channel(db, xmldata):
62 if not xmldata: return False
63
64 # Note: websub does not return global author, hence taking from first video
65 title, _, videos = parse_xml(xmldata)
66
67 c = db.cursor()
68 for i, video in enumerate(videos):
69 now = datetime.now(timezone.utc)
70 updated = dateutil.parser.parse(video['updated'])
71 published = dateutil.parser.parse(video['published'])
72 # if update and published time are near-identical, we assume it's new.
73 if (updated - published).seconds < 60 and (now - published).days < 7:
74 timestamp = now
75 else:#, it's just an update to an older video.
76 timestamp = published
77
78 c.execute("""
79 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
80 VALUES (?, ?, ?, datetime(?), datetime(?))
81 """, (
82 video['video_id'],
83 video['channel_id'],
84 video['title'],
85 video['published'],
86 timestamp
87 ))
88
89 if i == 0: # only required once per feed
90 c.execute("""
91 INSERT OR REPLACE INTO channels (id, name)
92 VALUES (?, ?)
93 """, (video['channel_id'], video['author']))
94 db.commit()
95
96 return True
97
98 def get_video_info(video_id, sts=0, algo=""):
99 """
100 returns: best-quality muxed video stream, player_response, error-type/mesage
101 error types: player, malformed, livestream, geolocked, exhausted
102 """
103 player_error = None # for 'exhausted'
104 for el in ['embedded', 'detailpage']:#sometimes, only one or the other works
105 r = requests.get(f"https://www.youtube.com/get_video_info"+
106 f"?video_id={video_id}"+
107 f"&eurl=https://youtube.googleapis.com/v/{video_id}"+
108 f"&el={el}"+
109 f"&sts={sts}"+
110 f"&hl=en_US") #"&hl=en&gl=US"
111 params = parse_qs(r.text)
112 if 'errorcode' in params: # status=fail
113 return None, None, 'malformed', params['reason'][0]
114
115 metadata = json.loads(params.get('player_response')[0])
116 playabilityStatus = metadata['playabilityStatus']['status']
117 if playabilityStatus != "OK":
118 playabilityReason = metadata['playabilityStatus']['reason']
119 player_error = f"{playabilityStatus}: {playabilityReason}"
120 if playabilityStatus == "UNPLAYABLE":
121 continue # try again with next el value (or fail as exhausted)
122 # without videoDetails, there's only the error message
123 maybe_metadata = metadata if 'videoDetails' in metadata else None
124 return None, maybe_metadata, 'player', player_error
125 if metadata['videoDetails']['isLiveContent']:
126 return None, metadata, 'livestream', None
127
128 if not 'formats' in metadata['streamingData']:
129 continue # no urls
130
131 formats = metadata['streamingData']['formats']
132 for (i,v) in enumerate(formats):
133 if not ('cipher' in v or 'signatureCipher' in v): continue
134 cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
135 formats[i]['url'] = unscramble(cipher, algo)
136
137 # todo: check if we have urls or try again
138 url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
139
140 if 'gcr' in parse_qs(url):
141 return None, metadata, 'geolocked', None
142
143 return url, metadata, None, None
144 else:
145 return None, metadata, 'exhausted', player_error
146
147 def unscramble(cipher, algo): # test video id: UxxajLWwzqY
148 signature = list(cipher['s'][0])
149 for c in algo.split():
150 op, ix = re.match(r"([rsw])(\d+)?", c).groups()
151 ix = int(ix) % len(signature) if ix else 0
152 if not op: continue
153 if op == 'r': signature = list(reversed(signature))
154 if op == 's': signature = signature[ix:]
155 if op == 'w': signature[0], signature[ix] = signature[ix], signature[0]
156 sp = cipher.get('sp', ['signature'])[0]
157 sig = cipher.get('sig', [''.join(signature)])[0]
158 return f"{cipher['url'][0]}&{sp}={sig}"
159
160 def prepare_metadata(metadata):
161 meta1 = metadata['videoDetails']
162 meta2 = metadata['microformat']['playerMicroformatRenderer']
163 cards = metadata['cards']['cardCollectionRenderer']['cards'] \
164 if 'cards' in metadata else []
165 endsc = metadata['endscreen']['endscreenRenderer']['elements'] \
166 if 'endscreen' in metadata else []
167
168 # the actual video streams have exact information:
169 try:
170 sd = metadata['streamingData']
171 some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0]
172 aspect_ratio = some_stream['width'] / some_stream['height']
173 # if that's unavailable (e.g. on livestreams), fall back to
174 # thumbnails (only either 4:3 or 16:9).
175 except:
176 some_img = meta2['thumbnail']['thumbnails'][0]
177 aspect_ratio = some_img['width'] / some_img['height']
178
179 subtitles = sorted([
180 {'url':cc['baseUrl'],
181 'code':cc['languageCode'],
182 'autogenerated':cc.get('kind')=="asr",
183 'name':cc['name']['simpleText']}
184 for cc in metadata.get('captions',{})
185 .get('playerCaptionsTracklistRenderer',{})
186 .get('captionTracks',[])
187 ], key=lambda cc: cc['autogenerated'])
188
189 def clean_url(url):
190 # externals URLs are redirected through youtube.com/redirect, but we
191 # may encounter internal URLs, too
192 return parse_qs(urlparse(url).query).get('q',[url])[0]
193 # Remove left-/rightmost word from string:
194 delL = lambda s: s.partition(' ')[2]
195 delR = lambda s: s.rpartition(' ')[0]
196 # Thousands seperator aware int():
197 intT = lambda s: int(s.replace(',', ''))
198
199 def parse_infocard(card):
200 card = card['cardRenderer']
201 ctype = list(card['content'].keys())[0]
202 content = card['content'][ctype]
203 if ctype == "pollRenderer":
204 ctype = "POLL"
205 content = {
206 'question': content['question']['simpleText'],
207 'answers': [(a['text']['simpleText'],a['numVotes']) \
208 for a in content['choices']],
209 }
210 elif ctype == "videoInfoCardContentRenderer":
211 ctype = "VIDEO"
212 # if the card references a live stream, it has no length, but a "LIVE NOW" badge.
213 # TODO: this is ugly; cleanup.
214 is_live = content.get('badge',{}).get('liveBadgeRenderer',{})
215 length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03'
216 content = {
217 'video_id': content['action']['watchEndpoint']['videoId'],
218 'title': content['videoTitle']['simpleText'],
219 'author': delL(content['channelName']['simpleText']),
220 'length': length,
221 'views': intT(delR(content['viewCountText']['simpleText'])),
222 }
223 elif ctype == "playlistInfoCardContentRenderer":
224 ctype = "PLAYLIST"
225 content = {
226 'playlist_id': content['action']['watchEndpoint']['playlistId'],
227 'video_id': content['action']['watchEndpoint']['videoId'],
228 'title': content['playlistTitle']['simpleText'],
229 'author': delL(content['channelName']['simpleText']),
230 'n_videos': intT(content['playlistVideoCount']['simpleText']),
231 }
232 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']:
233 ctype = "WEBSITE"
234 content = {
235 'url': clean_url(content['command']['urlEndpoint']['url']),
236 'domain': content['displayDomain']['simpleText'],
237 'title': content['title']['simpleText'],
238 # XXX: no thumbnails for infocards
239 }
240 elif ctype == "collaboratorInfoCardContentRenderer":
241 ctype = "CHANNEL"
242 content = {
243 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
244 'title': content['channelName']['simpleText'],
245 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
246 'subscribers': content['subscriberCountText']['simpleText'], # "545K subscribers"
247 }
248 else:
249 import pprint
250 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
251
252 return {'type': ctype, 'content': content}
253
254 def mkthumbs(thumbs):
255 return {e['height']: e['url'] for e in thumbs}
256 def parse_endcard(card):
257 card = card.get('endscreenElementRenderer', card) #only sometimes nested
258 ctype = card['style']
259 if ctype == "CHANNEL":
260 content = {
261 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
262 'title': card['title']['simpleText'],
263 'icons': mkthumbs(card['image']['thumbnails']),
264 }
265 elif ctype == "VIDEO":
266 content = {
267 'video_id': card['endpoint']['watchEndpoint']['videoId'],
268 'title': card['title']['simpleText'],
269 'length': card['videoDuration']['simpleText'], # '12:21'
270 'views': delR(card['metadata']['simpleText']),
271 # XXX: no channel name
272 }
273 elif ctype == "PLAYLIST":
274 content = {
275 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
276 'video_id': card['endpoint']['watchEndpoint']['videoId'],
277 'title': card['title']['simpleText'],
278 'author': delL(card['metadata']['simpleText']),
279 'n_videos': intT(delR(card['playlistLength']['simpleText'])),
280 }
281 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
282 ctype = "WEBSITE"
283 url = clean_url(card['endpoint']['urlEndpoint']['url'])
284 content = {
285 'url': url,
286 'domain': urlparse(url).netloc,
287 'title': card['title']['simpleText'],
288 'icons': mkthumbs(card['image']['thumbnails']),
289 }
290 else:
291 import pprint
292 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
293
294 return {'type': ctype, 'content': content}
295
296 infocards = [parse_infocard(card) for card in cards]
297 endcards = [parse_endcard(card) for card in endsc]
298 # combine cards to weed out duplicates. for videos and playlists prefer
299 # infocards, for channels and websites prefer endcards, as those have more
300 # information than the other.
301 # if the card type is not in ident, we use the whole card for comparison
302 # (otherwise they'd all replace each other)
303 ident = { # ctype -> ident
304 'VIDEO': 'video_id',
305 'PLAYLIST': 'playlist_id',
306 'CHANNEL': 'channel_id',
307 'WEBSITE': 'url',
308 'POLL': 'question',
309 }
310 getident = lambda c: c['content'].get(ident.get(c['type']), c)
311 mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types]
312 exclude = lambda cards, without: [c for c in cards if getident(c) not in without]
313
314 allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \
315 exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST']))
316
317 all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD
318 BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH
319 CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER
320 ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT
321 GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE
322 KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD
323 ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF
324 NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA
325 RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX
326 SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ
327 VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split()
328 whitelisted = sorted(meta2['availableCountries'])
329 blacklisted = sorted(set(all_countries) - set(whitelisted))
330
331 return {
332 'title': meta1['title'],
333 'author': meta1['author'],
334 'channel_id': meta1['channelId'],
335 'description': meta1['shortDescription'],
336 'published': meta2['publishDate'],
337 'views': meta1['viewCount'],
338 'length': int(meta1['lengthSeconds']),
339 'rating': meta1['averageRating'],
340 'category': meta2['category'],
341 'aspectr': aspect_ratio,
342 'unlisted': meta2['isUnlisted'],
343 'countries': whitelisted,
344 'blacklisted': blacklisted,
345 'poster': meta2['thumbnail']['thumbnails'][0]['url'],
346 'infocards': infocards,
347 'endcards': endcards,
348 'all_cards': allcards,
349 'subtitles': subtitles,
350 }
351
352 def pp(*args):
353 from pprint import pprint
354 import sys, codecs
355 pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))
Imprint / Impressum