import concurrent.futures import html import random import re import urllib.request from concurrent.futures import as_completed from datetime import datetime from urllib.parse import quote_plus, urlparse, parse_qs, unquote import requests from modules import shared from modules.logging_colors import logger def get_current_timestamp(): """Returns the current time in 24-hour format""" return datetime.now().strftime('%b %d, %Y %H:%M') def download_web_page(url, timeout=10): """ Download a web page and convert its HTML content to Markdown text, handling Brotli/gzip and non-HTML content robustly. """ logger.info(f"Downloading {url}") # --- soft deps try: import html2text except Exception: logger.exception("html2text import failed") html2text = None try: from readability import Document except Exception: Document = None try: import brotli as _brotli have_brotli = True except Exception: _brotli = None have_brotli = False import gzip, zlib, re, html as _html headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", # IMPORTANT: only advertise br if brotli is installed "Accept-Encoding": "gzip, deflate" + (", br" if have_brotli else ""), "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", } try: resp = requests.get(url, headers=headers, timeout=timeout) resp.raise_for_status() # --- bail out early if it's not HTML ctype = resp.headers.get("Content-Type", "").lower() if not any(t in ctype for t in ("text/html", "application/xhtml+xml")): logger.warning("Non-HTML content-type %r at %s", ctype, url) return "" # --- get raw bytes then decompress if server didn't/requests couldn't raw = resp.content # bytes enc_hdr = resp.headers.get("Content-Encoding", "").lower() # If requests didn't decode (it normally does gzip/deflate), handle manually. if "br" in enc_hdr and have_brotli: try: raw = _brotli.decompress(raw) except Exception: # it may already be decoded; ignore pass elif "gzip" in enc_hdr: try: raw = gzip.decompress(raw) except Exception: pass elif "deflate" in enc_hdr: try: raw = zlib.decompress(raw, -zlib.MAX_WBITS) except Exception: pass # --- decode text with a robust charset guess # use HTTP charset if present charset = None if "charset=" in ctype: charset = ctype.split("charset=")[-1].split(";")[0].strip() if not charset: # requests’ detector charset = resp.apparent_encoding or "utf-8" try: html_text = raw.decode(charset, errors="replace") except Exception: html_text = raw.decode("utf-8", errors="replace") # anti-bot shells (avoid empty output surprises) if re.search(r"(cf-chl|Just a moment|enable JavaScript)", html_text, re.I): logger.warning("Possible anti-bot/challenge page at %s", url) # --- extract readable text (readability -> html2text -> fallback) md_readability = "" if Document is not None: try: doc = Document(html_text) title = (doc.short_title() or "").strip() main_html = doc.summary(html_partial=True) main_text = re.sub(r"<[^>]+>", " ", main_html, flags=re.S) main_text = re.sub(r"\s+", " ", main_text).strip() if title: md_readability = f"# {title}\n\n{main_text}".strip() else: md_readability = main_text except Exception: logger.exception("readability failed on %s", url) md_html2text = "" if html2text is not None: try: h = html2text.HTML2Text() h.body_width = 0 h.ignore_images = True h.ignore_links = True h.single_line_break = True md_html2text = (h.handle(html_text) or "").strip() except Exception: logger.exception("html2text failed on %s", url) def _clean(s): s = re.sub(r"<[^>]+>", " ", s, flags=re.S) return _html.unescape(re.sub(r"\s+", " ", s)).strip() # fallback: meta/title/headers/paragraphs + noscript parts = [] t = re.search(r"
]*>(.*?)
", html_text, re.I | re.S)[:8] if _clean(p)] for n in re.findall(r"", html_text, re.I | re.S): c = _clean(n) if c: parts.append(c) md_fallback = "\n\n".join([p for p in parts if p]).strip() best = max([md_readability, md_html2text, md_fallback], key=lambda s: len(s or "")) if not best.strip(): logger.warning("Empty content extracted from %s", url) return best except requests.exceptions.RequestException as e: logger.error(f"Error downloading {url}: {e}") return "" except Exception: logger.exception("Unexpected error while downloading %s", url) return "" def _extract_results_from_duckduckgo(response_text, num_pages): # 1) Grab the title anchors (they carry the real clickable href) # We capture both the inner text (title) and href. anchor_pattern = re.compile( r']*class="[^"]*result__a[^"]*"[^>]*href="([^"]+)"[^>]*>(.*?)', re.DOTALL | re.IGNORECASE ) matches = anchor_pattern.findall(response_text) results = [] for href, title_html in matches: # 2) Resolve DuckDuckGo redirect: ?uddg=