from html.parser import HTMLParser
from urllib.parse import urljoin
import json
import secrets
import sqlite3
import requests
from ..common.common import cf
from werkzeug.exceptions import BadGateway
from flask import current_app
class ExtractCaptcha(HTMLParser):
def __init__(self, html):
super().__init__()
self.action = None
self.sitekey = None
self.svalue = None
self.inputs = {}
self.handle_starttag = self.find_form
super().feed(html)
def find_form(self, tag, attrs):
attrs = dict(attrs)
if tag == "form" and attrs.get('action').partition('?')[0] in ["/das_captcha", "index"]:
self.action = attrs["action"]
self.handle_starttag = self.find_values
self.handle_endtag = self.find_end
def find_values(self, tag, attrs):
attrs = dict(attrs)
if tag == "div" and attrs.get('id') == "recaptcha" or \
tag == "div" and attrs.get('class') == "g-recaptcha":
self.sitekey = attrs.get('data-sitekey')
self.svalue = attrs.get('data-s')
if tag == "input" and "name" in attrs:
self.inputs[attrs["name"]] = attrs["value"]
def find_end(self, tag):
if tag == "form":
self.handle_starttag = self.find_nil
self.handle_endtag = self.find_nil
def find_nil(self, *args): pass
def check_captcha_or_raise(r):
if "To continue with your YouTube experience, please fill out the form below." not in r.text:
return
with sqlite3.connect(cf['global']['database']) as conn:
c = conn.cursor()
# check if a captcha was already submitted recently:
c.execute("""
SELECT COUNT(*)
FROM captcha_requests
WHERE timestamp > datetime('now', '-90 seconds')
""")
(already_submitted,) = c.fetchone()
if already_submitted:
current_app.logger.warn("check_captcha_or_raise: already submitted")
# TODO: get time of last submission and display that to the user
return
captcha = ExtractCaptcha(r.text)
nonce = secrets.token_urlsafe(16)
inputs = json.dumps(captcha.inputs)
task_id = 0 # XXX: if i commit after i send the request, it fails!?
# note: auto field for current datetime
# note: key, svalue only for debugging
c.execute("""
INSERT INTO captcha_requests(nonce, url, action, key, svalue, task_id, inputs)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (nonce, r.url, captcha.action, captcha.sitekey, captcha.svalue, task_id, inputs))
conn.commit()
r2 = requests.post("https://api.capmonster.cloud/createTask", json={
"clientKey": cf['captcha']['api_key'],
"task": {
"type": "NoCaptchaTaskProxyless",
"websiteURL": r.url,
"websiteKey": captcha.sitekey,
"recaptchaDataSValue": captcha.svalue,
},
"callbackUrl": f"{cf['captcha']['webhook_host']}/captcha_response/{nonce}", #XXX
})
task_id = r2.json().get("taskId")
c.execute("""
UPDATE OR IGNORE captcha_response
SET task_id = ?
WHERE nonce = ?
""", (task_id, nonce)) # for debugging only; task_id is not in webhook response
raise BadGateway("Rate-limited by Youtube; please try again in two seconds")
def solve_captcha(nonce, json_obj):
with sqlite3.connect(cf['global']['database']) as conn:
c = conn.cursor()
c.execute("""
SELECT url, action, task_id, inputs
FROM captcha_requests
WHERE nonce = ? -- AND timestamp > date('now', '-90 seconds')
""", (nonce,))
try:
url, action, task_id, inputs = c.fetchone()
inputs = json.loads(inputs)
# note: there is no taskId in the response, so we can't verify that :|
except:
raise NotFound # todo: ugly
solution = json_obj.get("solution", {})
inputs["g-recaptcha-response"] = solution.get("gRecaptchaResponse")
cookies = solution.get("cookies")
# cookies aren't preserved in r.cookies when the redirect is followed(wtf!?), and we don't need that response anyways.
r = requests.post(urljoin(url, action), cookies=cookies, data=inputs, allow_redirects=False)
import pickle
pickle.dump(r, open("/tmp/das-captcha.req", "wb"))
captcha_cookies = r.cookies
#cargo-culted from invidious, but i don't believe it's necessary
# if enabled, use r.post(allow_redirects=False)!
#if action == "/sorry/index":
# from urllib.parse import parse_qs
# captcha_cookies, _, _ = parse_qs(r.headers["Location"]) \
# .get("google_abuse", "") \
# .partition(";")
# xxx: returns cookie header-value; parse to dict
c.execute("DELETE FROM captcha_cookies") # not using insert-or-replace-into to avoid keeping removed cookies
c.executemany("""
INSERT INTO captcha_cookies(name, value)
VALUES (?, ?)
""", captcha_cookies.items())
c.execute("""
DELETE FROM captcha_requests
WHERE nonce = ? OR timestamp < date('now', '-1 minute')
""", (nonce,))