初级爬虫代码分享

最后更新于 2025-05-10 343 次阅读


#! /usr/bin/env python

import requests
import logging
import re
from urllib.parse import urljoin
import pymongo
import multiprocessing

mongo_client = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = mongo_client["pc"]
collection = db["test"]

#创建loger
loger = logging.getLogger("爬虫log")
loger.setLevel(logging.INFO)

#log显示
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
#创建文件日志处理器
file_handler = logging.FileHandler("爬虫.log", encoding="utf-8")
#日志格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

loger.addHandler(console_handler)
loger.addHandler(file_handler)
#节点测试
# loger.info("爬虫启动")    # 输出到控制台和文件
# loger.warning("数据解析异常")
# loger.error("请求失败")

URL1 = 'https://ssr1.scrape.center' #爬取页面URL
PAGE = 10  #翻页的页数

#爬取每一页内容
def scrape_index(page):
    #拼接url
    index_url = f'{URL1}/page/{page}'
    return scrape_page(index_url)

#判断是否响应成功
def scrape_page(url):
    loger.info("正在抓取 %s....",url)
    #get请求
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        else:
            loger.error("爬取%s失败%s",url,response.status_code)
    except requests.RequestException :
        #程序异常,将当前的异常信息(包括异常类型、异常值和堆栈跟踪信息)记录到日志中
        loger.error("爬取%s发生异常,url,exc_info=True")
    return None

#解析内容,正则表达式提取需要的内容
def parse_index(html):
    #正在表达式格式
    pattern = re.compile('<a.*href="(.*?)".*?class="name"')
    items = re.findall(pattern, html)
    # print(items) #节点测试
    if not items:
        return []
    for item in items:
        #把相对连接转为绝对连接
        detail_url = urljoin(URL1,item)
        # print(detail_url) #节点测试
        loger.info('成功爬取%s',detail_url)
        # 逐个生成电影详情页的 URL,便于后续爬取和处理
        yield detail_url

def scrape_detail(url):
    return scrape_page(url)

#更进一步
def parse_detail(html):
    #使用正则表达式提取内容
    cover_pattern = re.compile(
        'class="el-col.*?<img.*?src="(.*?)".*?class="cover">', re.S)

    # cover_pattern = re.compile(
    #     '<img.*?src="(.*?)".*?class="cover">', re.S)
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    categories_pattern = re.compile(
        '<button.*?category.*?<span>(.*?)</span>.*?</button>', re.S)
    published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2})\s?上映')
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>', re.S)
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>', re.S)

    cover = re.search(cover_pattern, html).group(1).strip() if re.search(cover_pattern, html) else None
    name = re.search(name_pattern, html).group(1).strip() if re.search(name_pattern, html) else None
    categories = re.findall(categories_pattern, html) if re.findall(categories_pattern, html) else []
    published_at = re.search(published_at_pattern, html).group(1) if re.search(published_at_pattern, html) else None
    drama = re.search(drama_pattern, html).group(1).strip() if re.search(drama_pattern, html) else None
    score = float(re.search(score_pattern, html).group(1).strip()) if re.search(score_pattern, html) else None
    # print(type(cover))
    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }

def sava_data(data):
    collection.insert_one(data)
    loger.info("数据保存成功")


def main(page):
    # for page in range(1,PAGE+1):
    index_html = scrape_index(page)
    detail_urls = parse_index(index_html)
        # print(list(detail_urls)) #节点测试
    loger.info("详情页面链接%s",detail_urls)
        # print(detail_urls)
    for detail_url in detail_urls:
        detail_html = scrape_detail(detail_url)
            # print(detail_html)
        data = parse_detail(detail_html)
        loger.info('get detail data %s',data)
        sava_data(data=data)

def run_main(page):
    main(page)

if __name__ == '__main__':   #节点测试
   #多线程
    num_process = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_process)
    page_to_scrape = list(range(1,PAGE+1))
    pool.map(run_main,page_to_scrape) #使用进程池运行
    pool.close()

爬取目标:https://ssr1.scrape.center

代码待改进地方:

scrapy库,伪造user-agent,代理

终极目标:

分布式爬虫 vps拨号换ip vpn换ip,解密js 破解debugger等

下载链接:http://8.134.70.73:12345/爬虫.py