]> git.gir.st - subscriptionfeed.git/blob - app/common/common.py
fix premiere video handling
[subscriptionfeed.git] / app / common / common.py
1 import os
2 import re
3 import json
4 import html
5 import base64
6 import sqlite3
7 import requests
8 import hmac, hashlib
9 import requests_cache
10 import dateutil.parser
11 from xml.etree import ElementTree
12 from configparser import ConfigParser
13 from datetime import datetime, timezone
14 from urllib.parse import parse_qs, urlparse
15
16 cf = ConfigParser()
17 config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
18 cf.read(config_filename)
19 if not 'global' in cf: # todo: full config check
20 raise Exception("Configuration file not found or empty")
21
22 # Note: currently expiring after 10 minutes. googlevideo-urls are valid for 5h59m, but this makes reddit very stale and premiere videos won't start. TODO: exipre when video is livestream/premiere/etc
23 requests_cache.install_cache(backend='memory', expire_after=10*60, allowable_codes=(200,))
24
25 # Note: this should only be required for the 'memory' backed cache.
26 # TODO: only run for long-running processes, i.e. the frontend
27 from threading import Timer
28 def purge_cache(sec):
29 requests_cache.remove_expired_responses()
30 t = Timer(sec, purge_cache, args=(sec,))
31 t.setDaemon(True)
32 t.start()
33 purge_cache(10*60)
34
35 # for debugging purposes, monkey patch requests session to store each requests-request in a flask-request's g object (url and response). we can then use a flask error_handler to include the request data in the error log.
36 # since we also call config from outside the flask appcontext, it is wrapped in a try-catch block.
37 from flask import g
38 import requests
39 from requests import Session as OriginalSession
40 class _NSASession(OriginalSession):
41 def request(self, method, url, params=None, data=None, **kwargs):
42 response = super(_NSASession, self).request(
43 method, url, params, data, **kwargs
44 )
45 try:
46 if 'api_requests' not in g:
47 g.api_requests = []
48 g.api_requests.append((url, params, response.text))
49 except RuntimeError: pass # not within flask (e.g. utils.py)
50 return response
51 requests.Session = requests.sessions.Session = _NSASession
52
53 def fetch_xml(feed_type, feed_id):
54 # TODO: handle requests.exceptions.ConnectionError
55 r = requests.get("https://www.youtube.com/feeds/videos.xml", {
56 feed_type: feed_id,
57 })
58 if not r.ok:
59 return None
60
61 return r.content
62
63 def parse_xml(xmldata):
64 ns = {
65 'atom':"http://www.w3.org/2005/Atom",
66 'yt': "http://www.youtube.com/xml/schemas/2015",
67 'media':"http://search.yahoo.com/mrss/",
68 'at': "http://purl.org/atompub/tombstones/1.0",
69 }
70
71 feed = ElementTree.fromstring(xmldata)
72
73 if feed.find('at:deleted-entry',ns):
74 (_,_,vid) = feed.find('at:deleted-entry',ns).get('ref').rpartition(':')
75 return None, None, [{'deleted': True, 'video_id': vid}]
76
77 title = feed.find('atom:title',ns).text
78 author = feed.find('atom:author/atom:name',ns).text \
79 if feed.find('atom:author',ns) else None
80 videos = []
81 for entry in feed.findall('atom:entry',ns):
82 videos.append({
83 'video_id': entry.find('yt:videoId',ns).text,
84 'title': entry.find('atom:title',ns).text,
85 'published': entry.find('atom:published',ns).text,
86 'channel_id': entry.find('yt:channelId',ns).text,
87 'author': entry.find('atom:author',ns).find('atom:name',ns).text,
88 # extra fields for pull_subs/webhook:
89 'updated': entry.find('atom:updated',ns).text,
90 })
91
92 return title, author, videos
93
94 def update_channel(db, xmldata, from_webhook=False):
95 if not xmldata: return False
96
97 # Note: websub does not return global author, hence taking from first video
98 _, _, videos = parse_xml(xmldata)
99
100 c = db.cursor()
101 from flask import current_app # XXX: remove
102 for i, video in enumerate(videos):
103 if video.get('deleted'):
104 if from_webhook: current_app.logger.warning(f"ignoring deleted video {video['video_id']}") # XXX: remove
105 # TODO: enable once we enforce hmac validation:
106 #c.execute("DELETE FROM videos WHERE id = ?", (video['video_id'],))
107 break
108
109 now = datetime.now(timezone.utc)
110 updated = dateutil.parser.parse(video['updated'])
111 published = dateutil.parser.parse(video['published'])
112 # if update and published time are near-identical, we assume it's new.
113 # checking if it was posted this week is necessary during xmlfeed pulling.
114 if (updated - published).seconds < 60 and (now - published).days < 7:
115 timestamp = now
116 if from_webhook: current_app.logger.warning(f"fresh video {video['video_id']}") # XXX: remove
117 else:#, it might just an update to an older video, or a previously unlisted one.
118 # first, assume it's an older video (correct when pulling xmlfeeds)
119 timestamp = published
120 # then, check if we don't know about it and if so, look up the real date.
121
122 # The 'published' timestamp sent in websub POSTs are often wrong (e.g.:
123 # video gets uploaded as unlisted on day A and set to public on day B;
124 # the webhook is sent on day B, but 'published' says A. The video
125 # therefore looks like it's just an update to an older video). If
126 # that's the case, we fetch get_video_info and double-check.
127 # We only need to do this to not-yet-in-the-database videos.
128 c.execute("SELECT 1 from videos where id = ?", (video['video_id'],))
129 new_video = len(c.fetchall()) < 1
130 if from_webhook: current_app.logger.warning(f"video {video['video_id']}") # XXX: remove
131 if from_webhook and new_video:
132 if from_webhook: current_app.logger.warning(f" is webhook and new") # XXX: remove
133 _, meta, _, _ = get_video_info(video['video_id'])
134 if meta:
135 meta = prepare_metadata(meta)
136 published = dateutil.parser.parse(meta['published'])
137 if from_webhook: current_app.logger.warning(f" uploaded {published}") # XXX: remove
138 if (now - published).days < 7:
139 timestamp = now
140 else:#, it's just an update to an older video.
141 timestamp = published
142
143 c.execute("""
144 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
145 VALUES (?, ?, ?, datetime(?), datetime(?))
146 """, (
147 video['video_id'],
148 video['channel_id'],
149 video['title'],
150 video['published'],
151 timestamp
152 ))
153
154 if i == 0: # only required once per feed
155 c.execute("""
156 INSERT OR REPLACE INTO channels (id, name)
157 VALUES (?, ?)
158 """, (video['channel_id'], video['author']))
159 db.commit()
160
161 return True
162
163 def get_video_info(video_id, sts=0, algo=""):
164 """
165 returns: best-quality muxed video stream, player_response, error-type/mesage
166 error types: player, malformed, livestream, geolocked, exhausted
167 """
168 player_error = None # for 'exhausted'
169 for el in ['embedded', 'detailpage']:#sometimes, only one or the other works
170 r = requests.get("https://www.youtube.com/get_video_info", {
171 "video_id": video_id,
172 "eurl": f"https://youtube.googleapis.com/v/{video_id}",
173 "el": el,
174 "sts": sts,
175 "hl": "en_US",
176 })
177 params = parse_qs(r.text)
178 if 'errorcode' in params: # status=fail
179 return None, None, 'malformed', params['reason'][0]
180
181 metadata = json.loads(params.get('player_response')[0])
182 playabilityStatus = metadata['playabilityStatus']['status']
183 if playabilityStatus != "OK":
184 playabilityReason = metadata['playabilityStatus'].get('reason',
185 '//'.join(metadata['playabilityStatus'].get('messages',[])))
186 player_error = f"{playabilityStatus}: {playabilityReason}"
187 if playabilityStatus == "UNPLAYABLE":
188 continue # try again with next el value (or fail as exhausted)
189 # without videoDetails, there's only the error message
190 maybe_metadata = metadata if 'videoDetails' in metadata else None
191 return None, maybe_metadata, 'player', player_error
192 if metadata['videoDetails'].get('isLive', False):
193 return None, metadata, 'livestream', None
194
195 if not 'formats' in metadata['streamingData']:
196 continue # no urls
197
198 formats = metadata['streamingData']['formats']
199 for (i,v) in enumerate(formats):
200 if not ('cipher' in v or 'signatureCipher' in v): continue
201 cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
202 formats[i]['url'] = unscramble(cipher, algo)
203
204 # todo: check if we have urls or try again
205 url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
206
207 # ip-locked videos can be recovered if the proxy module is loaded:
208 is_geolocked = 'geolocked' if 'gcr' in parse_qs(urlparse(url).query) else None
209
210 return url, metadata, is_geolocked, None
211 else:
212 return None, metadata, 'exhausted', player_error
213
214 def unscramble(cipher, algo): # test video id: UxxajLWwzqY
215 signature = list(cipher['s'][0])
216 for c in algo.split():
217 op, ix = re.match(r"([rsw])(\d+)?", c).groups()
218 ix = int(ix) % len(signature) if ix else 0
219 if not op: continue
220 if op == 'r': signature = list(reversed(signature))
221 if op == 's': signature = signature[ix:]
222 if op == 'w': signature[0], signature[ix] = signature[ix], signature[0]
223 sp = cipher.get('sp', ['signature'])[0]
224 sig = cipher.get('sig', [''.join(signature)])[0]
225 return f"{cipher['url'][0]}&{sp}={sig}"
226
227 def prepare_metadata(metadata):
228 meta1 = metadata['videoDetails']
229 meta2 = metadata['microformat']['playerMicroformatRenderer']
230 cards = metadata['cards']['cardCollectionRenderer']['cards'] \
231 if 'cards' in metadata else []
232 endsc = metadata['endscreen']['endscreenRenderer']['elements'] \
233 if 'endscreen' in metadata else []
234
235 # the actual video streams have exact information:
236 try:
237 sd = metadata['streamingData']
238 some_stream = (sd.get('adaptiveFormats',[]) + sd.get('formats',[]))[0]
239 aspect_ratio = some_stream['width'] / some_stream['height']
240 # if that's unavailable (e.g. on livestreams), fall back to
241 # thumbnails (only either 4:3 or 16:9).
242 except:
243 some_img = meta2['thumbnail']['thumbnails'][0]
244 aspect_ratio = some_img['width'] / some_img['height']
245
246 # Note: we could get subtitles in multiple formats directly by querying
247 # https://video.google.com/timedtext?hl=en&type=list&v=<VIDEO_ID> followed by
248 # https://www.youtube.com/api/timedtext?lang=<LANG_CODE>&v=<VIDEO_ID>&fmt={srv1|srv2|srv3|ttml|vtt},
249 # but that won't give us autogenerated subtitles (and is an extra request).
250 # we can still add &fmt= to the extracted URLs below (first one takes precedence).
251 try: # find the native language captions (assuming there is only 1 audioTrack) (any level might not exist):
252 default_track = metadata.get('captions',{}).get('playerCaptionsTracklistRenderer',{}).get('defaultAudioTrackIndex', 0)
253 main_subtitle = metadata['captions']['playerCaptionsTracklistRenderer']['audioTracks'][default_track]['defaultCaptionTrackIndex']
254 except:
255 main_subtitle = -1
256 subtitles = sorted([
257 {'url':cc['baseUrl'],
258 'code':cc['languageCode'],
259 'autogenerated':cc.get('kind')=="asr",
260 'name':cc['name']['simpleText'],
261 'default':i==main_subtitle,
262 'query':"fmt=vtt&"+urlparse(cc['baseUrl']).query} # for our internal proxy
263 for i,cc in enumerate(metadata.get('captions',{})
264 .get('playerCaptionsTracklistRenderer',{})
265 .get('captionTracks',[]))
266 # sort order: default lang gets weight 0 (first), other manually translated weight 1, autogenerated weight 2:
267 ], key=lambda cc: (not cc['default']) + cc['autogenerated'])
268
269 def clean_url(url):
270 # externals URLs are redirected through youtube.com/redirect, but we
271 # may encounter internal URLs, too
272 return parse_qs(urlparse(url).query).get('q',[url])[0]
273 # Remove left-/rightmost word from string:
274 delL = lambda s: s.partition(' ')[2]
275 delR = lambda s: s.rpartition(' ')[0]
276 # Thousands seperator aware int():
277 intT = lambda s: int(s.replace(',', ''))
278
279 def parse_infocard(card):
280 card = card['cardRenderer']
281 ctype = list(card['content'].keys())[0]
282 content = card['content'][ctype]
283 if ctype == "pollRenderer":
284 ctype = "POLL"
285 content = {
286 'question': content['question']['simpleText'],
287 'answers': [(a['text']['simpleText'],a['numVotes']) \
288 for a in content['choices']],
289 }
290 elif ctype == "videoInfoCardContentRenderer":
291 ctype = "VIDEO"
292 # if the card references a live stream, it has no length, but a "LIVE NOW" badge.
293 # TODO: this is ugly; cleanup.
294 is_live = content.get('badge',{}).get('liveBadgeRenderer',{})
295 length = is_live.get('label',{}).get('simpleText') or content['lengthString']['simpleText'] # '23:03'
296 content = {
297 'video_id': content['action']['watchEndpoint']['videoId'],
298 'title': content['videoTitle']['simpleText'],
299 'author': delL(content['channelName']['simpleText']),
300 'length': length,
301 'views': intT(delR(content['viewCountText']['simpleText'])),
302 }
303 elif ctype == "playlistInfoCardContentRenderer":
304 ctype = "PLAYLIST"
305 content = {
306 'playlist_id': content['action']['watchEndpoint']['playlistId'],
307 'video_id': content['action']['watchEndpoint']['videoId'],
308 'title': content['playlistTitle']['simpleText'],
309 'author': delL(content['channelName']['simpleText']),
310 'n_videos': intT(content['playlistVideoCount']['simpleText']),
311 }
312 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content['command']:
313 ctype = "WEBSITE"
314 content = {
315 'url': clean_url(content['command']['urlEndpoint']['url']),
316 'domain': content['displayDomain']['simpleText'],
317 'title': content['title']['simpleText'],
318 # XXX: no thumbnails for infocards
319 }
320 elif ctype == "collaboratorInfoCardContentRenderer":
321 ctype = "CHANNEL"
322 content = {
323 'channel_id': content['endpoint']['browseEndpoint']['browseId'],
324 'title': content['channelName']['simpleText'],
325 'icons': mkthumbs(content['channelAvatar']['thumbnails']),
326 'subscribers': content.get('subscriberCountText',{}).get('simpleText',''), # "545K subscribers"
327 }
328 else:
329 import pprint
330 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
331
332 return {'type': ctype, 'content': content}
333
334 def mkthumbs(thumbs):
335 return {e['height']: e['url'] for e in thumbs}
336 def parse_endcard(card):
337 card = card.get('endscreenElementRenderer', card) #only sometimes nested
338 ctype = card['style']
339 if ctype == "CHANNEL":
340 content = {
341 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
342 'title': card['title']['simpleText'],
343 'icons': mkthumbs(card['image']['thumbnails']),
344 }
345 elif ctype == "VIDEO":
346 content = {
347 'video_id': card['endpoint']['watchEndpoint']['videoId'], # XXX: KeyError 'endpoint' exception (no idea which youtube video this was on)
348 'title': card['title']['simpleText'],
349 'length': card['videoDuration']['simpleText'], # '12:21'
350 'views': delR(card['metadata']['simpleText']),
351 # XXX: no channel name
352 }
353 elif ctype == "PLAYLIST":
354 content = {
355 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
356 'video_id': card['endpoint']['watchEndpoint']['videoId'],
357 'title': card['title']['simpleText'],
358 'author': delL(card['metadata']['simpleText']),
359 'n_videos': intT(delR(card['playlistLength']['simpleText'])),
360 }
361 elif ctype == "WEBSITE" or ctype == "CREATOR_MERCHANDISE":
362 ctype = "WEBSITE"
363 url = clean_url(card['endpoint']['urlEndpoint']['url'])
364 content = {
365 'url': url,
366 'domain': urlparse(url).netloc,
367 'title': card['title']['simpleText'],
368 'icons': mkthumbs(card['image']['thumbnails']),
369 }
370 else:
371 import pprint
372 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
373
374 return {'type': ctype, 'content': content}
375
376 infocards = [parse_infocard(card) for card in cards]
377 endcards = [parse_endcard(card) for card in endsc]
378 # combine cards to weed out duplicates. for videos and playlists prefer
379 # infocards, for channels and websites prefer endcards, as those have more
380 # information than the other.
381 # if the card type is not in ident, we use the whole card for comparison
382 # (otherwise they'd all replace each other)
383 ident = { # ctype -> ident
384 'VIDEO': 'video_id',
385 'PLAYLIST': 'playlist_id',
386 'CHANNEL': 'channel_id',
387 'WEBSITE': 'url',
388 'POLL': 'question',
389 }
390 getident = lambda c: c['content'].get(ident.get(c['type']), c)
391 mkexclude = lambda cards, types: [getident(c) for c in cards if c['type'] in types]
392 exclude = lambda cards, without: [c for c in cards if getident(c) not in without]
393
394 allcards = exclude(infocards, mkexclude(endcards, ['CHANNEL','WEBSITE'])) + \
395 exclude(endcards, mkexclude(infocards, ['VIDEO','PLAYLIST']))
396
397 all_countries = """AD AE AF AG AI AL AM AO AQ AR AS AT AU AW AX AZ BA BB BD
398 BE BF BG BH BI BJ BL BM BN BO BQ BR BS BT BV BW BY BZ CA CC CD CF CG CH
399 CI CK CL CM CN CO CR CU CV CW CX CY CZ DE DJ DK DM DO DZ EC EE EG EH ER
400 ES ET FI FJ FK FM FO FR GA GB GD GE GF GG GH GI GL GM GN GP GQ GR GS GT
401 GU GW GY HK HM HN HR HT HU ID IE IL IM IN IO IQ IR IS IT JE JM JO JP KE
402 KG KH KI KM KN KP KR KW KY KZ LA LB LC LI LK LR LS LT LU LV LY MA MC MD
403 ME MF MG MH MK ML MM MN MO MP MQ MR MS MT MU MV MW MX MY MZ NA NC NE NF
404 NG NI NL NO NP NR NU NZ OM PA PE PF PG PH PK PL PM PN PR PS PT PW PY QA
405 RE RO RS RU RW SA SB SC SD SE SG SH SI SJ SK SL SM SN SO SR SS ST SV SX
406 SY SZ TC TD TF TG TH TJ TK TL TM TN TO TR TT TV TW TZ UA UG UM US UY UZ
407 VA VC VE VG VI VN VU WF WS YE YT ZA ZM ZW""".split()
408 whitelisted = sorted(meta2.get('availableCountries',[]))
409 blacklisted = sorted(set(all_countries) - set(whitelisted))
410
411 published_at = f"{meta2['publishDate']}T00:00:00Z" # yyyy-mm-dd
412 # 'premiere' videos (and livestreams?) have a ISO8601 date available:
413 if 'liveBroadcastDetails' in meta2 and 'startTimestamp' in meta2['liveBroadcastDetails']: # TODO: tighten up
414 published_at = meta2['liveBroadcastDetails']['startTimestamp']
415
416 return {
417 'title': meta1['title'],
418 'author': meta1['author'],
419 'channel_id': meta1['channelId'],
420 'description': meta1['shortDescription'],
421 'published': published_at,
422 'views': meta1['viewCount'],
423 'length': int(meta1['lengthSeconds']),
424 'rating': meta1['averageRating'],
425 'category': meta2['category'],
426 'aspectr': aspect_ratio,
427 'unlisted': meta2['isUnlisted'],
428 'whitelisted': whitelisted,
429 'blacklisted': blacklisted,
430 'poster': meta2['thumbnail']['thumbnails'][0]['url'],
431 'infocards': infocards,
432 'endcards': endcards,
433 'all_cards': allcards,
434 'subtitles': subtitles,
435 }
436
437 def store_video_metadata(video_id):
438 # check if we know about it, and if not, fetch and store video metadata
439 with sqlite3.connect(cf['global']['database']) as conn:
440 c = conn.cursor()
441 c.execute("SELECT 1 from videos where id = ?", (video_id,))
442 new_video = len(c.fetchall()) < 1
443 if new_video:
444 _, meta, _, _ = get_video_info(video_id)
445 if meta:
446 meta = prepare_metadata(meta)
447 c.execute("""
448 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
449 VALUES (?, ?, ?, datetime(?), datetime(?))
450 """, (
451 video_id,
452 meta['channel_id'],
453 meta['title'],
454 meta['published'],
455 meta['published'],
456 ))
457 c.execute("""
458 INSERT OR REPLACE INTO channels (id, name)
459 VALUES (?, ?)
460 """, (meta['channel_id'], meta['author']))
461
462 class RedditException(Exception): pass
463 def fetch_reddit(subreddits, sorted_by="hot", time=None, *, limit=36,
464 count=None, before=None, after=None):
465 """
466 fetches data from a subreddit (or a multireddit like gif+gifs) and
467 filters/sorts results.
468 sorted_by values: hot, new, rising, controversial, top
469 time values: hour, day, week, month, year, all (for top and controversial)
470 """
471
472 if not subreddits:
473 return None
474
475 query = {k:v for k,v in {
476 'count':count,
477 'before':before,
478 'after':after,
479 'limit':limit, # 1..100 (default 25)
480 't': time, # hour,week,month,year,all
481 }.items() if v}
482 multireddit = '+'.join(subreddits)
483 r = requests.get(f"https://old.reddit.com/r/{multireddit}/{sorted_by}.json",
484 query, headers={'User-Agent':'Mozilla/5.0'})
485 if not r.ok or not 'data' in r.json():
486 raise RedditException(r.text)
487
488 return r.json()
489
490 def fetch_reddit_post(post_id):
491 # Note: /api/info.json?id=t3_h7mjes == /by_id/t3_h7mjes.json
492 r = requests.get(f"https://old.reddit.com/by_id/t3_{post_id}.json",
493 headers={'User-Agent':'Mozilla/5.0'})
494 if not r.ok or not 'data' in r.json():
495 raise RedditException(r.text)
496
497 return r.json()
498
499 def parse_reddit_videos(data):
500 videos = []
501 entries = sorted(data['data']['children'],
502 key=lambda e: e['data']['score'] > 1,
503 reverse=True)
504 for entry in entries:
505 e = entry['data']
506 if e['domain'] not in ['youtube.com', 'youtu.be', 'invidio.us']:
507 continue
508 try:
509 # Note: youtube.com/<video_id> is not valid (404s), but seen in the wild.
510 video_id = re.match(r'^https?://(?:www.|m.)?(?:youtube.com/watch\?(?:.*&amp;)?v=|youtu.be/|youtube.com/embed/|youtube.com/)([-_0-9A-Za-z]+)', e['url']).group(1)
511 except:
512 continue # XXX: should we log that?
513 if not video_id: continue
514 videos.append({
515 'video_id': video_id,
516 'title': html.unescape(e['title']), # Note: we unescape and re-escape in the template
517 'url': e['permalink'],
518 'n_comments': e['num_comments'],
519 'n_karma': e['score'],
520 'subreddit': e['subreddit'],
521 'post_id': e['id'],
522 })
523
524 return videos
525
526 from werkzeug.exceptions import NotFound
527 class NoFallbackException(NotFound): pass
528 def fallback_route(*args, **kwargs): # TODO: worthy as a flask-extension?
529 """
530 finds the next route that matches the current url rule, and executes it.
531 args, kwargs: pass all arguments of the current route
532 """
533 from flask import current_app, request, g
534
535 # build a list of endpoints that match the current request's url rule:
536 matching = [
537 rule.endpoint
538 for rule in current_app.url_map.iter_rules()
539 if rule.rule == request.url_rule.rule
540 ]
541 current = matching.index(request.endpoint)
542
543 # since we can't change request.endpoint, we always get the original
544 # endpoint back. so for repeated fall throughs, we use the g object to
545 # increment how often we want to fall through.
546 if not '_fallback_next' in g:
547 g._fallback_next = 0
548 g._fallback_next += 1
549
550 next_ep = current + g._fallback_next
551
552 if next_ep < len(matching):
553 return current_app.view_functions[matching[next_ep]](*args, **kwargs)
554 else:
555 raise NoFallbackException
556
557 def websub_url_hmac(key, feed_id, timestamp, nonce):
558 """ generate sha1 hmac, as required by websub/pubsubhubbub """
559 sig_input = f"{feed_id}:{timestamp}:{nonce}".encode('ascii')
560 return hmac.new(key.encode('ascii'), sig_input, hashlib.sha1).hexdigest()
561
562 def websub_body_hmac(key, body):
563 return hmac.new(key.encode('ascii'), body, hashlib.sha1).hexdigest()
564
565 def pp(*args):
566 from pprint import pprint
567 import sys, codecs
568 pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))
Imprint / Impressum