import secrets import sqlite3 import requests from urllib.parse import urljoin from html.parser import HTMLParser from werkzeug.exceptions import NotFound from ..common.common import cf, flask_logger # NOTE: We are taking a few shortcuts here, that might bite us later: # 1. we are not sending form data or request cookies # 2. we are only storing the 'goojf' cookie # 3. invidious has some extra cookie code for /sorry/index # 4. we are expecting a response within 90 seconds (max is 5min) class ExtractCaptcha(HTMLParser): def __init__(self, html): super().__init__() self.action = None self.sitekey = None self.svalue = None self.inputs = {} self.handle_starttag = self.find_form super().feed(html) def find_form(self, tag, attrs): attrs = dict(attrs) clean_action = attrs.get('action','').partition('?')[0] if tag == "form" and clean_action in ["/das_captcha", "index"]: self.action = attrs["action"] self.handle_starttag = self.find_values self.handle_endtag = self.find_end def find_values(self, tag, attrs): attrs = dict(attrs) if tag == "div" and attrs.get('id') == "recaptcha" or \ tag == "div" and attrs.get('class') == "g-recaptcha": self.sitekey = attrs.get('data-sitekey') self.svalue = attrs.get('data-s') if tag == "input" and "name" in attrs: self.inputs[attrs["name"]] = attrs["value"] def find_end(self, tag): if tag == "form": self.handle_starttag = self.find_nil self.handle_endtag = self.find_nil def find_nil(self, *args): pass def submit_captcha(r): # returns: # - False if not rate limited or disabled by user # - True if just now submitted. # - int(seconds since last request) if already submitted api_key = cf['captcha']['api_key'] api_host = cf['captcha']['api_host'] public_uri = cf['webhooks']['public_uri'] if not api_key: return False # disabled by admin if "To continue with your YouTube experience, please fill out the form below." not in r.text: return False with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() # check if a captcha was already submitted recently: c.execute(""" SELECT (julianday('now') - julianday(timestamp)) * 86400 FROM captcha_requests WHERE timestamp > datetime('now', '-90 seconds') ORDER BY timestamp DESC LIMIT 1 """) # Note: 90sec should work fine for capmonster, might need tweaking. result = c.fetchone() if result: # already submitted (last_ago,) = result flask_logger(f"last request submitted {last_ago}s ago") return int(last_ago) captcha = ExtractCaptcha(r.text) nonce = secrets.token_urlsafe(16) # note: auto field for current datetime c.execute(""" INSERT INTO captcha_requests(nonce, url, action) VALUES (?, ?, ?) """, (nonce, r.url, captcha.action)) conn.commit() r2 = requests.post(f"{api_host}/createTask", json={ "clientKey": api_key, "task": { "type": "NoCaptchaTaskProxyless", "websiteURL": r.url, "websiteKey": captcha.sitekey, "recaptchaDataSValue": captcha.svalue, }, "callbackUrl": f"{public_uri}/captcha_response/v1/{nonce}", }) task_id = r2.json().get("taskId") flask_logger(f"submitted captcha task with id {task_id}") return True def solve_captcha(nonce, json_obj): with sqlite3.connect(cf['global']['database']) as conn: c = conn.cursor() c.execute(""" SELECT url, action FROM captcha_requests WHERE nonce = ? -- AND timestamp > date('now', '-90 seconds') """, (nonce,)) try: url, action = c.fetchone() except: raise NotFound # todo: ugly solution = json_obj.get("solution", {}) r = requests.post( urljoin(url, action), data={"g-recaptcha-response": solution.get("gRecaptchaResponse")}, allow_redirects=False ) cookie_name = "goojf" cookie_value = r.cookies.get(cookie_name) c.execute(""" INSERT OR REPLACE INTO captcha_cookies(name, value) VALUES (?, ?) """, (cookie_name, cookie_value)) c.execute(""" DELETE FROM captcha_requests WHERE nonce = ? OR timestamp < date('now', '-90 seconds') """, (nonce,))