from html.parser import HTMLParser from urllib.parse import urljoin import json import secrets import sqlite3 import requests from ..common.common import cf from werkzeug.exceptions import BadGateway from flask import current_app class ExtractCaptcha(HTMLParser): def __init__(self, html): super().__init__() self.action = None self.sitekey = None self.svalue = None self.inputs = {} self.handle_starttag = self.find_form super().feed(html) def find_form(self, tag, attrs): attrs = dict(attrs) if tag == "form" and attrs.get('action').partition('?')[0] in ["/das_captcha", "index"]: self.action = attrs["action"] self.handle_starttag = self.find_values self.handle_endtag = self.find_end def find_values(self, tag, attrs): attrs = dict(attrs) if tag == "div" and attrs.get('id') == "recaptcha" or \ tag == "div" and attrs.get('class') == "g-recaptcha": self.sitekey = attrs.get('data-sitekey') self.svalue = attrs.get('data-s') if tag == "input" and "name" in attrs: self.inputs[attrs["name"]] = attrs["value"] def find_end(self, tag): if tag == "form": self.handle_starttag = self.find_nil self.handle_endtag = self.find_nil def find_nil(self, *args): pass def check_captcha_or_raise(r): if "To continue with your YouTube experience, please fill out the form below." not in r.text: return with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() # check if a captcha was already submitted recently: c.execute(""" SELECT COUNT(*) FROM captcha_requests WHERE timestamp > datetime('now', '-90 seconds') """) (already_submitted,) = c.fetchone() if already_submitted: current_app.logger.warn("check_captcha_or_raise: already submitted") # TODO: get time of last submission and display that to the user return captcha = ExtractCaptcha(r.text) nonce = secrets.token_urlsafe(16) inputs = json.dumps(captcha.inputs) task_id = 0 # XXX: if i commit after i send the request, it fails!? # note: auto field for current datetime # note: key, svalue only for debugging c.execute(""" INSERT INTO captcha_requests(nonce, url, action, key, svalue, task_id, inputs) VALUES (?, ?, ?, ?, ?, ?, ?) """, (nonce, r.url, captcha.action, captcha.sitekey, captcha.svalue, task_id, inputs)) conn.commit() api_key = cf['captcha']['api_key'] api_host = cf['captcha']['api_host'] public_uri = cf['webhooks']['public_uri'] r2 = requests.post(f"{api_host}/createTask", json={ "clientKey": api_key, "task": { "type": "NoCaptchaTaskProxyless", "websiteURL": r.url, "websiteKey": captcha.sitekey, "recaptchaDataSValue": captcha.svalue, }, "callbackUrl": f"{public_uri}/captcha_response/{nonce}", }) task_id = r2.json().get("taskId") c.execute(""" UPDATE OR IGNORE captcha_response SET task_id = ? WHERE nonce = ? """, (task_id, nonce)) # for debugging only; task_id is not in webhook response raise BadGateway("Rate-limited by Youtube; please try again in two seconds") def solve_captcha(nonce, json_obj): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT url, action, task_id, inputs FROM captcha_requests WHERE nonce = ? -- AND timestamp > date('now', '-90 seconds') """, (nonce,)) try: url, action, task_id, inputs = c.fetchone() inputs = json.loads(inputs) # note: there is no taskId in the response, so we can't verify that :| except: raise NotFound # todo: ugly solution = json_obj.get("solution", {}) inputs["g-recaptcha-response"] = solution.get("gRecaptchaResponse") cookies = solution.get("cookies") # only set/used for "google.com domains and subdomains" # cookies aren't preserved in r.cookies when the redirect is followed(wtf!?), and we don't need that response anyways. r = requests.post(urljoin(url, action), cookies=cookies, data=inputs, allow_redirects=False) import pickle pickle.dump(r, open("/tmp/das-captcha.req", "wb")) captcha_cookies = r.cookies #cargo-culted from invidious, but i don't believe it's necessary # if enabled, use r.post(allow_redirects=False)! #if action == "/sorry/index": # from urllib.parse import parse_qs # captcha_cookies, _, _ = parse_qs(r.headers["Location"]) \ # .get("google_abuse", "") \ # .partition(";") # xxx: returns cookie header-value; parse to dict c.execute("DELETE FROM captcha_cookies") # not using insert-or-replace-into to avoid keeping removed cookies c.executemany(""" INSERT INTO captcha_cookies(name, value) VALUES (?, ?) """, captcha_cookies.items()) c.execute(""" DELETE FROM captcha_requests WHERE nonce = ? OR timestamp < date('now', '-1 minute') """, (nonce,))