text-generation-webui/modules/web_search.py

173 lines
5.9 KiB
Python
Raw Permalink Normal View History

import concurrent.futures
2025-07-07 01:24:57 +02:00
import html
2025-08-14 21:02:30 +02:00
import random
2025-07-07 01:24:57 +02:00
import re
2025-08-14 21:02:30 +02:00
import urllib.request
from concurrent.futures import as_completed
2025-05-28 09:27:28 +02:00
from datetime import datetime
2025-07-07 01:24:57 +02:00
from urllib.parse import quote_plus
2025-05-28 09:27:28 +02:00
import requests
from modules import shared
2025-05-28 09:27:28 +02:00
from modules.logging_colors import logger
def get_current_timestamp():
"""Returns the current time in 24-hour format"""
return datetime.now().strftime('%b %d, %Y %H:%M')
def download_web_page(url, timeout=10):
"""
Download a web page and convert its HTML content to structured Markdown text.
"""
import html2text
2025-05-28 09:27:28 +02:00
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=timeout)
response.raise_for_status() # Raise an exception for bad status codes
2025-05-28 09:27:28 +02:00
# Initialize the HTML to Markdown converter
h = html2text.HTML2Text()
h.body_width = 0
h.ignore_images = True
h.ignore_links = True
2025-05-28 09:27:28 +02:00
# Convert the HTML to Markdown
markdown_text = h.handle(response.text)
2025-05-28 09:27:28 +02:00
return markdown_text
except requests.exceptions.RequestException as e:
2025-05-28 09:27:28 +02:00
logger.error(f"Error downloading {url}: {e}")
return ""
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return ""
2025-05-28 09:27:28 +02:00
2025-07-07 01:24:57 +02:00
def perform_web_search(query, num_pages=3, max_workers=5, timeout=10):
2025-05-28 09:27:28 +02:00
"""Perform web search and return results with content"""
try:
2025-07-07 01:24:57 +02:00
search_url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
2025-08-14 21:02:30 +02:00
agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
]
response_text = ""
req = urllib.request.Request(search_url, headers={'User-Agent': random.choice(agents)})
with urllib.request.urlopen(req, timeout=timeout) as response:
response_text = response.read().decode('utf-8')
2025-07-07 01:24:57 +02:00
# Extract results with regex
2025-08-14 21:02:30 +02:00
titles = re.findall(r'<a[^>]*class="[^"]*result__a[^"]*"[^>]*>(.*?)</a>', response_text, re.DOTALL)
urls = re.findall(r'<a[^>]*class="[^"]*result__url[^"]*"[^>]*>(.*?)</a>', response_text, re.DOTALL)
2025-05-28 09:27:28 +02:00
# Prepare download tasks
download_tasks = []
2025-07-07 01:24:57 +02:00
for i in range(min(len(titles), len(urls), num_pages)):
url = f"https://{urls[i].strip()}"
title = re.sub(r'<[^>]+>', '', titles[i]).strip()
title = html.unescape(title)
download_tasks.append((url, title, i))
search_results = [None] * len(download_tasks) # Pre-allocate to maintain order
# Download pages in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all download tasks
future_to_task = {
executor.submit(download_web_page, task[0]): task
for task in download_tasks
}
# Collect results as they complete
for future in as_completed(future_to_task):
url, title, index = future_to_task[future]
try:
content = future.result()
search_results[index] = {
'title': title,
'url': url,
'content': content
}
except Exception:
search_results[index] = {
'title': title,
'url': url,
'content': ''
}
2025-05-28 09:27:28 +02:00
return search_results
2025-05-28 09:27:28 +02:00
except Exception as e:
logger.error(f"Error performing web search: {e}")
return []
def truncate_content_by_tokens(content, max_tokens=8192):
"""Truncate content to fit within token limit using binary search"""
if len(shared.tokenizer.encode(content)) <= max_tokens:
return content
left, right = 0, len(content)
while left < right:
mid = (left + right + 1) // 2
if len(shared.tokenizer.encode(content[:mid])) <= max_tokens:
left = mid
else:
right = mid - 1
return content[:left]
def add_web_search_attachments(history, row_idx, user_message, search_query, state):
2025-05-28 09:27:28 +02:00
"""Perform web search and add results as attachments"""
if not search_query:
logger.warning("No search query provided")
2025-05-28 09:27:28 +02:00
return
try:
logger.info(f"Using search query: {search_query}")
2025-05-28 09:27:28 +02:00
# Perform web search
num_pages = int(state.get('web_search_pages', 3))
search_results = perform_web_search(search_query, num_pages)
if not search_results:
logger.warning("No search results found")
return
# Filter out failed downloads before adding attachments
successful_results = [result for result in search_results if result['content'].strip()]
if not successful_results:
logger.warning("No successful downloads to add as attachments")
return
2025-05-28 09:27:28 +02:00
# Add search results as attachments
key = f"user_{row_idx}"
if key not in history['metadata']:
history['metadata'][key] = {"timestamp": get_current_timestamp()}
if "attachments" not in history['metadata'][key]:
history['metadata'][key]["attachments"] = []
for result in successful_results:
2025-05-28 09:27:28 +02:00
attachment = {
2025-05-28 14:28:15 +02:00
"name": result['title'],
2025-05-28 09:27:28 +02:00
"type": "text/html",
2025-05-28 14:28:15 +02:00
"url": result['url'],
"content": truncate_content_by_tokens(result['content'])
2025-05-28 09:27:28 +02:00
}
history['metadata'][key]["attachments"].append(attachment)
logger.info(f"Added {len(successful_results)} successful web search results as attachments.")
2025-05-28 09:27:28 +02:00
except Exception as e:
logger.error(f"Error in web search: {e}")