]> git.gir.st - subscriptionfeed.git/blob - app/common/common.py
improve default subtitle handling
[subscriptionfeed.git] / app / common / common.py
1 import os
2 import re
3 import json
4 import html
5 import base64
6 import sqlite3
7 import requests
8 import hmac, hashlib
9 import requests_cache
10 import dateutil.parser
11 from xml.etree import ElementTree
12 from configparser import ConfigParser
13 from datetime import datetime, timezone
14 from urllib.parse import parse_qs, urlparse
15
16 cf = ConfigParser()
17 config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
18 cf.read(config_filename)
19 if not 'global' in cf: # todo: full config check
20 raise Exception("Configuration file not found or empty")
21
22 # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc
23 requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,))
24
25 # Note: this should only be required for the 'memory' backed cache.
26 # TODO: only run for long-running processes, i.e. the frontend
27 from threading import Timer
28 def purge_cache(sec):
29 requests_cache.remove_expired_responses()
30 t = Timer(sec, purge_cache, args=(sec,))
31 t.setDaemon(True)
32 t.start()
33 purge_cache(10*60)
34
35 # for debugging purposes, monkey patch requests session to store each requests-request in a flask-request's g object (url and response). we can then use a flask error_handler to include the request data in the error log.
36 # since we also call config from outside the flask appcontext, it is wrapped in a try-catch block.
37 from flask import g
38 import requests
39 from requests import Session as OriginalSession
40 class _NSASession(OriginalSession):
41 def request(self, method, url, params=None, data=None, **kwargs):
42 response = super(_NSASession, self).request(
43 method, url, params, data, **kwargs
44 )
45 try:
46 if 'api_requests' not in g:
47 g.api_requests = []
48 g.api_requests.append((url, params, response.text))
49 except RuntimeError: pass # not within flask (e.g. utils.py)
50 return response
51 requests.Session = requests.sessions.Session = _NSASession
52
53 def fetch_xml(feed_type, feed_id):
54 # TODO: handle requests.exceptions.ConnectionError
55 r = requests.get("https://www.youtube.com/feeds/videos.xml", {
56 feed_type: feed_id,
57 })
58 if not r.ok:
59 return None
60
61 return r.content
62
63 def parse_xml(xmldata):
64 ns = {
65 'atom':"http://www.w3.org/2005/Atom",
66 'yt': "http://www.youtube.com/xml/schemas/2015",
67 'media':"http://search.yahoo.com/mrss/",
68 'at': "http://purl.org/atompub/tombstones/1.0",
69 }
70
71 feed = ElementTree.fromstring(xmldata)
72
73 if feed.find('at:deleted-entry',ns):
74 (_,_,vid) = feed.find('at:deleted-entry',ns).get('ref').rpartition(':')
75 return None, None, [{'deleted': True, 'video_id': vid}]
76
77 title = feed.find('atom:title',ns).text
78 author = feed.find('atom:author/atom:name',ns).text \
79 if feed.find('atom:author',ns) else None
80 videos = []
81 for entry in feed.findall('atom:entry',ns):
82 videos.append({
83 'video_id': entry.find('yt:videoId',ns).text,
84 'title': entry.find('atom:title',ns).text,
85 'published': entry.find('atom:published',ns).text,
86 'channel_id': entry.find('yt:channelId',ns).text,
87 'author': entry.find('atom:author',ns).find('atom:name',ns).text,
88 # extra fields for pull_subs/webhook:
89 'updated': entry.find('atom:updated',ns).text,
90 })
91
92 return title, author, videos
93
94 def update_channel(db, xmldata, from_webhook=False):
95 if not xmldata: return False
96
97 # Note: websub does not return global author, hence taking from first video
98 _, _, videos = parse_xml(xmldata)
99
100 c = db.cursor()
101 from flask import current_app # XXX: remove
102 for i, video in enumerate(videos):
103 if video.get('deleted'):
104 if from_webhook: current_app.logger.warning(f"ignoring deleted video {video['video_id']}") # XXX: remove
105 # TODO: enable once we enforce hmac validation:
106 #c.execute("DELETE FROM videos WHERE id = ?", (video['video_id'],))
107 break
108
109 now = datetime.now(timezone.utc)
110 updated = dateutil.parser.parse(video['updated'])
111 published = dateutil.parser.parse(video['published'])
112 # if update and published time are near-identical, we assume it's new.
113 # checking if it was posted this week is necessary during xmlfeed pulling.
114 if (updated - published).seconds < 60 and (now - published).days < 7:
115 timestamp = now
116 if from_webhook: current_app.logger.warning(f"fresh video {video['video_id']}") # XXX: remove
117 else:#, it might just an update to an older video, or a previously unlisted one.
118 # first, assume it's an older video (correct when pulling xmlfeeds)
119 timestamp = published
120 # then, check if we don't know about it and if so, look up the real date.
121
122 # The 'published' timestamp sent in websub POSTs are often wrong (e.g.:
123 # video gets uploaded as unlisted on day A and set to public on day B;
124 # the webhook is sent on day B, but 'published' says A. The video
125 # therefore looks like it's just an update to an older video). If
126 # that's the case, we fetch get_video_info and double-check.
127 # We only need to do this to not-yet-in-the-database videos.
128 c.execute("SELECT 1 from videos where id = ?", (video['video_id'],))
129 new_video = len(c.fetchall()) < 1
130 if from_webhook: current_app.logger.warning(f"video {video['video_id']}") # XXX: remove
131 if from_webhook and new_video:
132 if from_webhook: current_app.logger.warning(f" is webhook and new") # XXX: remove
133 _, meta, _, _ = get_video_info(video['video_id'])
134 if meta:
135 meta = prepare_metadata(meta)
136 published = dateutil.parser.parse(meta['published'])
137 if from_webhook: current_app.logger.warning(f" uploaded {published}") # XXX: remove
138 if (now - published).days < 7:
139 timestamp = now
140 else:#, it's just an update to an older video.
141 timestamp = published
142
143 c.execute("""
144 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
145 VALUES (?, ?, ?, datetime(?), datetime(?))
146 """, (
147 video['video_id'],
148 video['channel_id'],
149 video['title'],
150 video['published'],
151 timestamp
152 ))
153
154 if i == 0: # only required once per feed
155 c.execute("""
156 INSERT OR REPLACE INTO channels (id, name)
157 VALUES (?, ?)
158 """, (video['channel_id'], video['author']))
159 db.commit()
160
161 return True
162
163 def get_video_info(video_id, sts=0, algo=""):
164 """
165 returns: best-quality muxed video stream, player_response, error-type/mesage
166 error types: player, malformed, livestream, geolocked, exhausted
167 """
168 player_error = None # for 'exhausted'
169 for el in ['embedded', 'detailpage']:#sometimes, only one or the other works
170 r = requests.get("https://www.youtube.com/get_video_info", {
171 "video_id": video_id,
172 "eurl": f"https://youtube.googleapis.com/v/{video_id}",
173 "el": el,
174 "sts": sts,
175 "hl": "en_US",
176 })
177 params = parse_qs(r.text)
178 if 'errorcode' in params: # status=fail
179 return None, None, 'malformed', params['reason'][0]
180
181 metadata = json.loads(params.get('player_response')[0])
182 playabilityStatus = metadata['playabilityStatus']['status']
183 if playabilityStatus != "OK":
184 playabilityReason = metadata['playabilityStatus'].get('reason',
185 '//'.join(metadata['playabilityStatus'].get('messages',[])))
186 player_error = f"{playabilityStatus}: {playabilityReason}"
187 if playabilityStatus == "UNPLAYABLE":
188 continue # try again with next el value (or fail as exhausted)
189 # without videoDetails, there's only the error message
190 maybe_metadata = metadata if 'videoDetails' in metadata else None
191 return None, maybe_metadata, 'player', player_error
192 if metadata['videoDetails']['isLiveContent'] and \
193 (metadata['videoDetails'].get('isLive', False) or \
194 metadata['videoDetails'].get('isPostLiveDvr', False)):
195 return None, metadata, 'livestream', None
196
197 if not 'formats' in metadata['streamingData']:
198 continue # no urls
199
200 formats = metadata['streamingData']['formats']
201 for (i,v) in enumerate(formats):
202 if not ('cipher' in v or 'signatureCipher' in v): continue
203 cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
204 formats[i]['url'] = unscramble(cipher, algo)
205
206 # todo: check if we have urls or try again
207 url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
208
209 if 'gcr' in parse_qs(url):
210 return None, metadata, 'geolocked', None
211
212 return url, metadata, None, None
213 else:
214 return None, metadata, 'exhausted', player_error
215
216 def unscramble(cipher, algo): # test video id: UxxajLWwzqY
217 signature = list(cipher['s'][0])
218 for c in algo.split():
219 op, ix = re.match(r"([rsw])(\d+)?", c).groups()
220 ix = int(ix) % len(signature) if ix else 0
221 if not op: continue
222 if op == 'r': signature = list(reversed(signature))
223 if op == 's': signature = signature[ix:]
224 if op == 'w': signature[0], signature[ix] = signature[ix], signature[0]
225 sp = cipher.get('sp', ['signature'])[0]
226 sig = cipher.get('sig', [''.join(signature)])[0]
227 return f"{cipher['url'][0]}&{sp}={sig}"
228
229 def prepare_metadata(metadata):
230 meta1 = metadata['videoDetails']
231 meta2 = metadata['microformat']['playerMicroformatRenderer']
232 cards = metadata['cards']['cardCollectionRenderer']['cards'] \
233 if 'cards' in metadata else []
234 endsc = metadata['endscreen']['endscreenRenderer']['elements'] \
235 if 'endscreen' in metadata else []
236
237 # the actual video streams have exact information:
238 try:
239 sd = metadata['streamingData']
240 some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0]
241 aspect_ratio = some_stream['width'] / some_stream['height']
242 # if that's unavailable (e.g. on livestreams), fall back to
243 # thumbnails (only either 4:3 or 16:9).
244 except:
245 some_img = meta2['thumbnail']['thumbnails'][0]
246 aspect_ratio = some_img['width'] / some_img['height']
247
248 # Note: we could get subtitles in multiple formats directly by querying
249 # https://video.google.com/timedtext?hl=en&type=list&v=<VIDEO_ID> followed by
250 # https://www.youtube.com/api/timedtext?lang=<LANG_CODE>&v=<VIDEO_ID>&fmt={srv1|srv2|srv3|ttml|vtt},
251 # but that won't give us autogenerated subtitles (and is an extra request).
252 # we can still add &fmt= to the extracted URLs below (first one takes precedence).
253 try: # find the native language captions (assuming there is only 1 audioTrack) (any level might not exist):
254 default_track = metadata.get('captions',{}).get('playerCaptionsTracklistRenderer',{}).get('defaultAudioTrackIndex', 0)
255 main_subtitle = metadata['captions']['playerCaptionsTracklistRenderer']['audioTracks'][default_track]['defaultCaptionTrackIndex']
256 except:
257 main_subtitle = -1
258 subtitles = sorted([
259 {'url':cc['baseUrl'],
260 'code':cc['languageCode'],
261 'autogenerated':cc.get('kind')=="asr",
262 'name':cc['name']['simpleText'],
263 'default':i==main_subtitle,
264 'query':"fmt=vtt&"+urlparse(cc['baseUrl']).query} # for our internal proxy
265 for i,cc in enumerate(metadata.get('captions',{})
266 .get('playerCaptionsTracklistRenderer',{})
267 .get('captionTracks',[]))
268 # sort order: default lang gets weight 0 (first), other manually translated weight 1, autogenerated weight 2:
269 ], key=lambda cc: (not cc['default']) + cc['autogenerated'])
270
271 def clean_url(url):
272 # externals URLs are redirected through youtube.com/redirect, but we
273 # may encounter internal URLs, too
274 return parse_qs(urlparse(url).query).get('q',[url])[0]
275 # Remove left-/rightmost word from string:
276 delL = lambda s: s.partition(' ')[2]
277 delR = lambda s: s.rpartition(' ')[0]
278 # Thousands seperator aware int():
279 intT = lambda s: int(s.replace(',', ''))
280
281 def parse_infocard(card):
282 card = card['cardRenderer']
283 ctype = list(card['content'].keys())[0]
284 content = card['content'][ctype]
285 if ctype == "pollRenderer":
286 ctype = "POLL"
287 content = {
288 'question': content['question']['simpleText'],
289 'answers': [(a['text']['simpleText'],a['numVotes']) \
290 for a in content['choices']],
291 }
292 elif ctype == "videoInfoCardContentRenderer":
293 ctype = "VIDEO"
294 # if the card references a live stream, it has no length, but a "LIVE NOW" badge.
295 # TODO: this is ugly; cleanup.
296 is_live = content.get('badge',{}).get('liveBadgeRenderer',{})
297 length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03'
298 content = {
299 'video_id': content['action']['watchEndpoint']['videoId'],
300 'title': content['videoTitle']['simpleText'],
301 'author': delL(content['channelName']['simpleText']),
302 'length': length,
303 'views': intT(delR(content['viewCountText']['simpleText'])),
304 }
305 elif ctype == "playlistInfoCardContentRenderer":
306 ctype = "PLAYLIST"
307 content = {
308 'playlist_id': content['action']['watchEndpoint']['playlistId'],
309 'video_id': content['action']['watchEndpoint']['videoId'],
310 'title': content['playlistTitle']['simpleText'],
311 'author': delL(content['channelName']['simpleText']),
312 'n_videos': intT(content['playlistVideoCount']['simpleText']),
313 }
314 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']:
315 ctype = "WEBSITE"
316 content = {
317 'url': clean_url(content['command']['urlEndpoint']['url']),
318 'domain': content['displayDomain']['simpleText'],
319 'title': content['title']['simpleText'],
320 # XXX: no thumbnails for infocards
321 }
322 elif ctype == "collaboratorInfoCardContentRenderer":
323 ctype = "CHANNEL"
324 content = {
325 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
326 'title': content['channelName']['simpleText'],
327 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
328 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
329 }
330 else:
331 import pprint
332 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
333
334 return {'type': ctype, 'content': content}
335
336 def mkthumbs(thumbs):
337 return {e['height']: e['url'] for e in thumbs}
338 def parse_endcard(card):
339 card = card.get('endscreenElementRenderer', card) #only sometimes nested
340 ctype = card['style']
341 if ctype == "CHANNEL":
342 content = {
343 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
344 'title': card['title']['simpleText'],
345 'icons': mkthumbs(card['image']['thumbnails']),
346 }
347 elif ctype == "VIDEO":
348 content = {
349 'video_id': card['endpoint']['watchEndpoint']['videoId'], # XXX: KeyError 'endpoint' exception (no idea which youtube video this was on)
350 'title': card['title']['simpleText'],
351 'length': card['videoDuration']['simpleText'], # '12:21'
352 'views': delR(card['metadata']['simpleText']),
353 # XXX: no channel name
354 }
355 elif ctype == "PLAYLIST":
356 content = {
357 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
358 'video_id': card['endpoint']['watchEndpoint']['videoId'],
359 'title': card['title']['simpleText'],
360 'author': delL(card['metadata']['simpleText']),
361 'n_videos': intT(delR(card['playlistLength']['simpleText'])),
362 }
363 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
364 ctype = "WEBSITE"
365 url = clean_url(card['endpoint']['urlEndpoint']['url'])
366 content = {
367 'url': url,
368 'domain': urlparse(url).netloc,
369 'title': card['title']['simpleText'],
370 'icons': mkthumbs(card['image']['thumbnails']),
371 }
372 else:
373 import pprint
374 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
375
376 return {'type': ctype, 'content': content}
377
378 infocards = [parse_infocard(card) for card in cards]
379 endcards = [parse_endcard(card) for card in endsc]
380 # combine cards to weed out duplicates. for videos and playlists prefer
381 # infocards, for channels and websites prefer endcards, as those have more
382 # information than the other.
383 # if the card type is not in ident, we use the whole card for comparison
384 # (otherwise they'd all replace each other)
385 ident = { # ctype -> ident
386 'VIDEO': 'video_id',
387 'PLAYLIST': 'playlist_id',
388 'CHANNEL': 'channel_id',
389 'WEBSITE': 'url',
390 'POLL': 'question',
391 }
392 getident = lambda c: c['content'].get(ident.get(c['type']), c)
393 mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types]
394 exclude = lambda cards, without: [c for c in cards if getident(c) not in without]
395
396 allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \
397 exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST']))
398
399 all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD
400 BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH
401 CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER
402 ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT
403 GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE
404 KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD
405 ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF
406 NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA
407 RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX
408 SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ
409 VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split()
410 whitelisted = sorted(meta2.get('availableCountries',[]))
411 blacklisted = sorted(set(all_countries) - set(whitelisted))
412
413 published_at = f"{meta2['publishDate']}T00:00:00Z" # yyyy-mm-dd
414 # 'premiere' videos (and livestreams?) have a ISO8601 date available:
415 if 'liveBroadcastDetails' in meta2 and 'startTimestamp' in meta2['liveBroadcastDetails']: # TODO: tighten up
416 published_at = meta2['liveBroadcastDetails']['startTimestamp']
417
418 return {
419 'title': meta1['title'],
420 'author': meta1['author'],
421 'channel_id': meta1['channelId'],
422 'description': meta1['shortDescription'],
423 'published': published_at,
424 'views': meta1['viewCount'],
425 'length': int(meta1['lengthSeconds']),
426 'rating': meta1['averageRating'],
427 'category': meta2['category'],
428 'aspectr': aspect_ratio,
429 'unlisted': meta2['isUnlisted'],
430 'whitelisted': whitelisted,
431 'blacklisted': blacklisted,
432 'poster': meta2['thumbnail']['thumbnails'][0]['url'],
433 'infocards': infocards,
434 'endcards': endcards,
435 'all_cards': allcards,
436 'subtitles': subtitles,
437 }
438
439 def store_video_metadata(video_id):
440 # check if we know about it, and if not, fetch and store video metadata
441 with sqlite3.connect(cf['global']['database']) as conn:
442 c = conn.cursor()
443 c.execute("SELECT 1 from videos where id = ?", (video_id,))
444 new_video = len(c.fetchall()) < 1
445 if new_video:
446 _, meta, _, _ = get_video_info(video_id)
447 if meta:
448 meta = prepare_metadata(meta)
449 c.execute("""
450 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
451 VALUES (?, ?, ?, datetime(?), datetime(?))
452 """, (
453 video_id,
454 meta['channel_id'],
455 meta['title'],
456 meta['published'],
457 meta['published'],
458 ))
459 c.execute("""
460 INSERT OR REPLACE INTO channels (id, name)
461 VALUES (?, ?)
462 """, (meta['channel_id'], meta['author']))
463
464 class RedditException(Exception): pass
465 def fetch_reddit(subreddits, sorted_by="hot", time=None, *, limit=36,
466 count=None, before=None, after=None):
467 """
468 fetches data from a subreddit (or a multireddit like gif+gifs) and
469 filters/sorts results.
470 sorted_by values: hot, new, rising, controversial, top
471 time values: hour, day, week, month, year, all (for top and controversial)
472 """
473
474 if not subreddits:
475 return None
476
477 query = {k:v for k,v in {
478 'count':count,
479 'before':before,
480 'after':after,
481 'limit':limit, # 1..100 (default 25)
482 't': time, # hour,week,month,year,all
483 }.items() if v}
484 multireddit = '+'.join(subreddits)
485 r = requests.get(f"https://old.reddit.com/r/{multireddit}/{sorted_by}.json",
486 query, headers={'User-Agent':'Mozilla/5.0'})
487 if not r.ok or not 'data' in r.json():
488 raise RedditException(r.text)
489
490 return r.json()
491
492 def fetch_reddit_post(post_id):
493 # Note: /api/info.json?id=t3_h7mjes == /by_id/t3_h7mjes.json
494 r = requests.get(f"https://old.reddit.com/by_id/t3_{post_id}.json",
495 headers={'User-Agent':'Mozilla/5.0'})
496 if not r.ok or not 'data' in r.json():
497 raise RedditException(r.text)
498
499 return r.json()
500
501 def parse_reddit_videos(data):
502 videos = []
503 entries = sorted(data['data']['children'],
504 key=lambda e: e['data']['score'] > 1,
505 reverse=True)
506 for entry in entries:
507 e = entry['data']
508 if e['domain'] not in ['youtube.com', 'youtu.be', 'invidio.us']:
509 continue
510 try:
511 # Note: youtube.com/<video_id> is not valid (404s), but seen in the wild.
512 video_id = re.match(r'^https?://(?:www.|m.)?(?:youtube.com/watch\?(?:.*&amp;)?v=|youtu.be/|youtube.com/embed/|youtube.com/)([-_0-9A-Za-z]+)', e['url']).group(1)
513 except:
514 continue # XXX: should we log that?
515 if not video_id: continue
516 videos.append({
517 'video_id': video_id,
518 'title': html.unescape(e['title']), # Note: we unescape and re-escape in the template
519 'url': e['permalink'],
520 'n_comments': e['num_comments'],
521 'n_karma': e['score'],
522 'subreddit': e['subreddit'],
523 'post_id': e['id'],
524 })
525
526 return videos
527
528 from werkzeug.exceptions import NotFound
529 class NoFallbackException(NotFound): pass
530 def fallback_route(*args, **kwargs): # TODO: worthy as a flask-extension?
531 """
532 finds the next route that matches the current url rule, and executes it.
533 args, kwargs: pass all arguments of the current route
534 """
535 from flask import current_app, request, g
536
537 # build a list of endpoints that match the current request's url rule:
538 matching = [
539 rule.endpoint
540 for rule in current_app.url_map.iter_rules()
541 if rule.rule == request.url_rule.rule
542 ]
543 current = matching.index(request.endpoint)
544
545 # since we can't change request.endpoint, we always get the original
546 # endpoint back. so for repeated fall throughs, we use the g object to
547 # increment how often we want to fall through.
548 if not '_fallback_next' in g:
549 g._fallback_next = 0
550 g._fallback_next += 1
551
552 next_ep = current + g._fallback_next
553
554 if next_ep < len(matching):
555 return current_app.view_functions[matching[next_ep]](*args, **kwargs)
556 else:
557 raise NoFallbackException
558
559 def websub_url_hmac(key, feed_id, timestamp, nonce):
560 """ generate sha1 hmac, as required by websub/pubsubhubbub """
561 sig_input = f"{feed_id}:{timestamp}:{nonce}".encode('ascii')
562 return hmac.new(key.encode('ascii'), sig_input, hashlib.sha1).hexdigest()
563
564 def websub_body_hmac(key, body):
565 return hmac.new(key.encode('ascii'), body, hashlib.sha1).hexdigest()
566
567 def pp(*args):
568 from pprint import pprint
569 import sys, codecs
570 pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))
Imprint / Impressum