]> git.gir.st - subscriptionfeed.git/blob - app/common/anticaptcha.py
integrate anticaptcha into common and clean it up a bit
[subscriptionfeed.git] / app / common / anticaptcha.py
1 import json
2 import secrets
3 import sqlite3
4 import requests
5 from urllib.parse import urljoin
6 from html.parser import HTMLParser
7
8 from werkzeug.exceptions import NotFound
9
10 from ..common.common import cf, flask_logger
11
12 class ExtractCaptcha(HTMLParser):
13 def __init__(self, html):
14 super().__init__()
15 self.action = None
16 self.sitekey = None
17 self.svalue = None
18 self.inputs = {}
19 self.handle_starttag = self.find_form
20 super().feed(html)
21 def find_form(self, tag, attrs):
22 attrs = dict(attrs)
23 clean_action = attrs.get('action','').partition('?')[0]
24 if tag == "form" and clean_action in ["/das_captcha", "index"]:
25 self.action = attrs["action"]
26 self.handle_starttag = self.find_values
27 self.handle_endtag = self.find_end
28 def find_values(self, tag, attrs):
29 attrs = dict(attrs)
30 if tag == "div" and attrs.get('id') == "recaptcha" or \
31 tag == "div" and attrs.get('class') == "g-recaptcha":
32 self.sitekey = attrs.get('data-sitekey')
33 self.svalue = attrs.get('data-s')
34 if tag == "input" and "name" in attrs:
35 self.inputs[attrs["name"]] = attrs["value"]
36 def find_end(self, tag):
37 if tag == "form":
38 self.handle_starttag = self.find_nil
39 self.handle_endtag = self.find_nil
40 def find_nil(self, *args): pass
41
42 def submit_captcha(r):
43 # returns:
44 # - False if not rate limited or disabled by user
45 # - True if just now submitted.
46 # - int(seconds since last request) if already submitted
47
48 api_key = cf['captcha']['api_key']
49 api_host = cf['captcha']['api_host']
50 public_uri = cf['webhooks']['public_uri']
51
52 if not api_key:
53 return False # disabled by admin
54
55 if "To continue with your YouTube experience, please fill out the form below." not in r.text:
56 return False
57
58 with sqlite3.connect(cf['global']['database']) as conn:
59 c = conn.cursor()
60
61 # check if a captcha was already submitted recently:
62 c.execute("""
63 SELECT (julianday('now') - julianday(timestamp)) * 86400
64 FROM captcha_requests
65 WHERE timestamp > datetime('now', '-90 seconds')
66 ORDER BY timestamp DESC
67 LIMIT 1
68 """) # Note: 90sec should work fine for capmonster, might need tweaking.
69 result = c.fetchone()
70 if result: # already submitted
71 (last_ago,) = result
72 return int(last_ago)
73
74 captcha = ExtractCaptcha(r.text)
75 nonce = secrets.token_urlsafe(16)
76 inputs = json.dumps(captcha.inputs)
77 #^: {"action_recaptcha_verify2": "1", "next": "/watch?v=***&hl=en&gl=US"}
78
79 # note: auto field for current datetime
80 c.execute("""
81 INSERT INTO captcha_requests(nonce, url, action, inputs)
82 VALUES (?, ?, ?, ?)
83 """, (nonce, r.url, captcha.action, inputs))
84 conn.commit()
85
86 r2 = requests.post(f"{api_host}/createTask", json={
87 "clientKey": api_key,
88 "task": {
89 "type": "NoCaptchaTaskProxyless",
90 "websiteURL": r.url,
91 "websiteKey": captcha.sitekey,
92 "recaptchaDataSValue": captcha.svalue,
93 },
94 "callbackUrl": f"{public_uri}/captcha_response/v1/{nonce}",
95 })
96 task_id = r2.json().get("taskId")
97 flask_logger(f"submitted captcha task with id {task_id}", "info")
98
99 return True
100
101 def solve_captcha(nonce, json_obj):
102 with sqlite3.connect(cf['global']['database']) as conn:
103 c = conn.cursor()
104 c.execute("""
105 SELECT url, action, inputs
106 FROM captcha_requests
107 WHERE nonce = ? -- AND timestamp > date('now', '-90 seconds')
108 """, (nonce,))
109 try:
110 url, action, inputs = c.fetchone()
111 inputs = json.loads(inputs)
112 except:
113 raise NotFound # todo: ugly
114
115 solution = json_obj.get("solution", {})
116 inputs["g-recaptcha-response"] = solution.get("gRecaptchaResponse")
117 cookies = solution.get("cookies")
118
119 r = requests.post(
120 urljoin(url, action),
121 cookies=cookies, data=inputs,
122 allow_redirects=False
123 )
124
125 captcha_cookies = r.cookies
126 #cargo-culted from invidious, but i don't believe it's necessary
127 # if enabled, use r.post(allow_redirects=False)!
128 #if action == "/sorry/index":
129 # from urllib.parse import parse_qs
130 # captcha_cookies, _, _ = parse_qs(r.headers["Location"]) \
131 # .get("google_abuse", "") \
132 # .partition(";")
133 # xxx: returns cookie header-value; parse to dict
134
135 c.execute("DELETE FROM captcha_cookies")
136 # not using insert-or-replace-into to avoid keeping removed cookies
137 c.executemany("""
138 INSERT INTO captcha_cookies(name, value)
139 VALUES (?, ?)
140 """, captcha_cookies.items())
141 c.execute("""
142 DELETE FROM captcha_requests
143 WHERE nonce = ? OR timestamp < date('now', '-1 minute')
144 """, (nonce,))
Imprint / Impressum