]> git.gir.st - subscriptionfeed.git/blob - app/anticaptcha/lib.py
[WARNING: CONFIG CHANGE] reorganize webhooks config
[subscriptionfeed.git] / app / anticaptcha / lib.py
1 from html.parser import HTMLParser
2 from urllib.parse import urljoin
3 import json
4 import secrets
5 import sqlite3
6 import requests
7
8 from ..common.common import cf
9
10 from werkzeug.exceptions import BadGateway
11 from flask import current_app
12
13 class ExtractCaptcha(HTMLParser):
14 def __init__(self, html):
15 super().__init__()
16 self.action = None
17 self.sitekey = None
18 self.svalue = None
19 self.inputs = {}
20 self.handle_starttag = self.find_form
21 super().feed(html)
22 def find_form(self, tag, attrs):
23 attrs = dict(attrs)
24 if tag == "form" and attrs.get('action').partition('?')[0] in ["/das_captcha", "index"]:
25 self.action = attrs["action"]
26 self.handle_starttag = self.find_values
27 self.handle_endtag = self.find_end
28 def find_values(self, tag, attrs):
29 attrs = dict(attrs)
30 if tag == "div" and attrs.get('id') == "recaptcha" or \
31 tag == "div" and attrs.get('class') == "g-recaptcha":
32 self.sitekey = attrs.get('data-sitekey')
33 self.svalue = attrs.get('data-s')
34 if tag == "input" and "name" in attrs:
35 self.inputs[attrs["name"]] = attrs["value"]
36 def find_end(self, tag):
37 if tag == "form":
38 self.handle_starttag = self.find_nil
39 self.handle_endtag = self.find_nil
40 def find_nil(self, *args): pass
41
42 def check_captcha_or_raise(r):
43 if "To continue with your YouTube experience, please fill out the form below." not in r.text:
44 return
45
46 with sqlite3.connect(cf['global']['database']) as conn:
47 c = conn.cursor()
48
49 # check if a captcha was already submitted recently:
50 c.execute("""
51 SELECT COUNT(*)
52 FROM captcha_requests
53 WHERE timestamp > datetime('now', '-90 seconds')
54 """)
55 (already_submitted,) = c.fetchone()
56 if already_submitted:
57 current_app.logger.warn("check_captcha_or_raise: already submitted")
58 # TODO: get time of last submission and display that to the user
59 return
60
61 captcha = ExtractCaptcha(r.text)
62 nonce = secrets.token_urlsafe(16)
63 inputs = json.dumps(captcha.inputs)
64
65 task_id = 0 # XXX: if i commit after i send the request, it fails!?
66 # note: auto field for current datetime
67 # note: key, svalue only for debugging
68 c.execute("""
69 INSERT INTO captcha_requests(nonce, url, action, key, svalue, task_id, inputs)
70 VALUES (?, ?, ?, ?, ?, ?, ?)
71 """, (nonce, r.url, captcha.action, captcha.sitekey, captcha.svalue, task_id, inputs))
72 conn.commit()
73
74 api_key = cf['captcha']['api_key']
75 api_host = cf['captcha']['api_host']
76 public_uri = cf['webhooks']['public_uri']
77 r2 = requests.post(f"{api_host}/createTask", json={
78 "clientKey": api_key,
79 "task": {
80 "type": "NoCaptchaTaskProxyless",
81 "websiteURL": r.url,
82 "websiteKey": captcha.sitekey,
83 "recaptchaDataSValue": captcha.svalue,
84 },
85 "callbackUrl": f"{public_uri}/captcha_response/{nonce}",
86 })
87 task_id = r2.json().get("taskId")
88 c.execute("""
89 UPDATE OR IGNORE captcha_response
90 SET task_id = ?
91 WHERE nonce = ?
92 """, (task_id, nonce)) # for debugging only; task_id is not in webhook response
93
94 raise BadGateway("Rate-limited by Youtube; please try again in two seconds")
95
96 def solve_captcha(nonce, json_obj):
97 with sqlite3.connect(cf['global']['database']) as conn:
98 c = conn.cursor()
99 c.execute("""
100 SELECT url, action, task_id, inputs
101 FROM captcha_requests
102 WHERE nonce = ? -- AND timestamp > date('now', '-90 seconds')
103 """, (nonce,))
104 try:
105 url, action, task_id, inputs = c.fetchone()
106 inputs = json.loads(inputs)
107 # note: there is no taskId in the response, so we can't verify that :|
108 except:
109 raise NotFound # todo: ugly
110
111 solution = json_obj.get("solution", {})
112 inputs["g-recaptcha-response"] = solution.get("gRecaptchaResponse")
113 cookies = solution.get("cookies") # only set/used for "google.com domains and subdomains"
114 # cookies aren't preserved in r.cookies when the redirect is followed(wtf!?), and we don't need that response anyways.
115 r = requests.post(urljoin(url, action), cookies=cookies, data=inputs, allow_redirects=False)
116 import pickle
117 pickle.dump(r, open("/tmp/das-captcha.req", "wb"))
118
119 captcha_cookies = r.cookies
120 #cargo-culted from invidious, but i don't believe it's necessary
121 # if enabled, use r.post(allow_redirects=False)!
122 #if action == "/sorry/index":
123 # from urllib.parse import parse_qs
124 # captcha_cookies, _, _ = parse_qs(r.headers["Location"]) \
125 # .get("google_abuse", "") \
126 # .partition(";")
127 # xxx: returns cookie header-value; parse to dict
128
129 c.execute("DELETE FROM captcha_cookies") # not using insert-or-replace-into to avoid keeping removed cookies
130 c.executemany("""
131 INSERT INTO captcha_cookies(name, value)
132 VALUES (?, ?)
133 """, captcha_cookies.items())
134 c.execute("""
135 DELETE FROM captcha_requests
136 WHERE nonce = ? OR timestamp < date('now', '-1 minute')
137 """, (nonce,))
Imprint / Impressum