#! /usr/bin/python3
import queue
import threading
import requests
from requests.exceptions import RequestException
#线程类定义
class DirScan(threading.Thread):
def __init__(self, queue, timeout=5):
super().__init__()
self.queue = queue # 每个线程持有一个队列引用
self.timeout = timeout # 每个线程拥有自己的超时设置
def run(self):
while True:
try:
url = self.queue.get_nowait()
except queue.Empty:
break # 队列为空时退出循环
try:
response = requests.get(
url=url,
timeout=self.timeout, # 超时设置
allow_redirects=False # 关闭重定向
)
if response.status_code == 200:
print(f'[*] Found: {url}')
except RequestException as e:
print(f'[!] Error accessing {url}: {str(e)}')
finally:
self.queue.task_done()
def start(base_url, ext, thread_count):
q = queue.Queue()
try:
with open(f'{ext}.txt', 'r', encoding='utf-8') as f:
for line in f:
path = line.strip()
if path:
full_url = base_url + path
q.put(full_url)
except FileNotFoundError:
print(f"[!] 字典文件 {ext}.txt 不存在!")
return
print(f"待扫描路径数量: {q.qsize()}")
# 创建线程池
threads = []
for _ in range(min(thread_count, q.qsize())):
t = DirScan(q)
t.start()
threads.append(t)
# 等待所有线程完成
for t in threads:
t.join()
if __name__ == '__main__':
# 示例参数
start(
base_url='http://8.134.70.73',
ext='后台路径',
thread_count=16
)
运行演示

Comments NOTHING