进程、线程和协程——python

 

协程

  • 协程,又称微线程,纤程
  • 协程是一种用户态的轻量级线程(协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,即每次过程重入时进入上一次离开时所处逻辑流的位置。)
  • 优点
    • 高并发
  • 缺点
    • 无法利用多核资源(协程需要和进程配合才能运行在多CPU上)
    • 进行阻塞操作时会阻塞掉整个程序
  • import Gevent python关于协程的第三方库(貌似对win不支持)

迭代器

1
2
3
4
5
6
7
8
9
10
11
list_ = [1, 2, 3]
it = iter(list_)
print(it)
print(next(it))
print(next(it))


# result
# <list_iterator object at 0x1073b8320>
# 1
# 2
1
2
3
4
5
6
7
8
list_ = [1, 2, 3]
it = iter(list_)
for i in it:
print(i, end='\t')


# result
# 1 2 3

生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import sys


list_ = [1, 2, 3]
it = iter(list_)
while True:
try:
print(next(it))
except StopIteration:
sys.exit()


# result
# 1
# 2
# 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# coding=utf-8


import sys


def triangles():
result = [1]
while True:
yield result
for i in range(1, len(result)):
result[i] = result[i] + prepare[i-1]
result.append(1)
prepare = result[:]


if __name__ == '__main__':
f = triangles()
for i in range(6):
print(f.__next__())


# result
# [1]
# [1, 1]
# [1, 2, 1]
# [1, 3, 3, 1]
# [1, 4, 6, 4, 1]
# [1, 5, 10, 10, 5, 1]

多线程

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading  # 引入线程
import time


def func_add(): # 目标函数
print('this is a thread')


num = threading.active_count() # 获取已经激活的线程数
print(num)
inf = threading.enumerate() # 获取所有线程信息
print(inf)
act = threading.current_thread() # 查看正在运行的线程
print(act)
t_1 = threading.Thread(target=func_add, ) # 添加线程,target表示线程要完成的任务
print(t_1)
t_1.start() # 让线程开始工作

join功能

  • 阻塞主线程,即在子线程未返回的时候,主线程等待其返回然后再继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import threading
import time


print('no join')


# 不加join的结果
def func_job_1():
print(1)
time.sleep(1)
print(2)


t_2 = threading.Thread(target=func_job_1, )
t_2.start()
print(3)

# result
# 1
# 3
# 2


time.sleep(2)
print('join')


# 加join的结果
def func_job_3():
print('3 start')
time.sleep(1)
print('3 end')


def func_job_4():
print('4 start')
print('4 end')


if __name__ == '__main__':
# no join
# t_3 = threading.Thread(target=func_job_3, )
# t_4 = threading.Thread(target=func_job_4, )
# t_3.start()
# t_4.start()
# print('all end')

# join
t_3 = threading.Thread(target=func_job_3, )
t_4 = threading.Thread(target=func_job_4, )
# 1221布局
t_3.start()
t_4.start()
t_4.join()
t_3.join()
print('all end')

存储线程结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time
from queue import Queue # 队列


def func_add(list_, queue_): # 目标函数
sum_ = 0
for i, val in enumerate(list_):
sum_ += val
queue_.put(sum_) # 多线程调用的函数不能用return返回值


queue_ = Queue() # queue_中存放返回值,代替return的返回值
threads_ = []
data = [[1, 1], [2, 2], [3, 3]]
for i in range(3): # 定义三个线程
t_ = threading.Thread(target=func_add, args=(data[i], queue_))
t_.start()
threads_.append(t_) # 把每个线程添加到线程列表中
for t_ in threads_: # 把所有线程join到主线程
t_.join()
result = []
for i in range(3):
result.append(queue_.get()) # 把队列中的值依次拿出
print(result) # 打印结果

GIL效率分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import threading
import time


def func_add(list_): # 目标函数
sum_ = 0
for i, val in enumerate(list_):
sum_ += val


def mutithreading_add(list_, threads, add_num, flag_join): # 多线程函数
for i in range(add_num):
for i in range(threads):
t_ = threading.Thread(target=func_add, args=(list_, ))
t_.start()
if flag_join is True:
t_.join()


def func_print(str_, n): # 目标函数
for i in range(n):
print(str_, end='')
print()


def mutithreading_print(str_, n, threads, flag_join):
for i in range(threads):
num = int(n / threads) # 每个线程打印的次数
t_ = threading.Thread(target=func_print, args=(str_, num, ))
t_.start()
if flag_join is True:
t_.join()


if __name__ == '__main__':
# 计算密集型
print('计算密集型')
# 单线程
time_1 = time.time()
mutithreading_add([1, 2, 3, 4, 5], 1, 500, True) # 1个线程,计算500次,加join
time_2 = time.time()
mutithreading_add([1, 2, 3, 4, 5], 1, 500, False) # 1个线程,计算500次,不加join
time_3 = time.time()
print('单线程 加join', '\t', time_2-time_1)
print('单线程 不加join', '\t', time_3-time_2)
# 多线程
time_1 = time.time()
mutithreading_add([1, 2, 3, 4, 5], 10, 500, True) # 10个线程,计算500次,加join
time_2 = time.time()
mutithreading_add([1, 2, 3, 4, 5], 10, 500, False) # 1个线程,计算500次,不加join
time_3 = time.time()
print('多线程 加join', '\t', time_2-time_1)
print('多线程 不加join', '\t', time_3-time_2)

# I/O密集型
print('I/O密集型')
# 单线程
time_1 = time.time()
mutithreading_print('hello world', 5000, 1, True) # 1个线程,打印5000次,加join
time_2 = time.time()
mutithreading_print('hello world', 5000, 1, False) # 1个线程,打印5000次,不加join
time_3 = time.time()
print('单线程 加join', '\t', time_2-time_1)
print('单线程 不加join', '\t', time_3-time_2)
# 多线程
time_1 = time.time()
mutithreading_print('hello world', 5000, 10, True) # 10个线程,打印5000次,加join
time_2 = time.time()
mutithreading_print('hello world', 5000, 10, False) # 1个线程,打印5000次,不加join
time_3 = time.time()
print('多线程 加join', '\t', time_2-time_1)
print('多线程 不加join', '\t', time_3-time_2)


# result
# 计算密集型
# 单线程 加join 0.0544428825378418
# 单线程 不加join 0.02800607681274414
# 多线程 加join 0.37989211082458496
# 多线程 不加join 0.462191104888916
# I/O密集型
# 单线程 加join 0.004912853240966797
# 单线程 不加join 0.005605220794677734
# 多线程 加join 0.0066182613372802734
# 多线程 不加join 0.005144834518432617
  • python的单线程基本都是比多线程快的,平常建议一直使用单线程

线程锁

  • lock = threading.Lock() 定义一个锁对象
  • lock.acquire() 将共享内存上锁
  • lock.release() 将锁打开
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# coding=utf-8


# 线程锁Lock
import threading
import time


# 不使用lock
def func_print_1(): # 目标函数
global number
for i in range(5):
time.sleep(0.5)
number += 1
print('func 1', '\t', number)


def func_print_2(): # 目标函数
global number
for i in range(5):
time.sleep(0.5)
number += 10
print('func 2', '\t', number)


def no_use_lock():
t_1 = threading.Thread(target=func_print_1, )
t_2 = threading.Thread(target=func_print_2, )
t_1.start()
t_2.start()
t_2.join()
t_1.join()


# 使用lock
def func_print_3(lock): # 目标函数
global number
for i in range(5):
lock.acquire() # 将共享内存上锁
time.sleep(0.5)
number += 1
print('func 3', '\t', number)
lock.release() # 将锁打开


def func_print_4(lock): # 目标函数
global number
for i in range(5):
lock.acquire()
time.sleep(0.5)
number += 10
print('func 4', '\t', number)
lock.release()


def use_lock():
lock = threading.Lock() # 定义一个线程锁
t_1 = threading.Thread(target=func_print_3, args=(lock, ))
t_2 = threading.Thread(target=func_print_4, args=(lock, ))
t_1.start()
t_2.start()
t_2.join()
t_1.join()


if __name__ == '__main__':
# 不使用线程锁
number = 0
print('no use lock')
time_1 = time.time()
no_use_lock()
time_2 = time.time()
print('time:', '\t', time_2-time_1)

# 使用线程锁
number = 0
print('use lock')
time_1 = time.time()
use_lock()
time_2 = time.time()
print('time:', '\t', time_2-time_1)


# result
# no use lock
# func 1 1
# func 2 11
# func 1 12
# func 2 22
# func 1 23
# func 2 33
# func 1 34
# func 2 44
# func 2 54
# func 1 55
# time: 2.514285087585449
# use lock
# func 3 1
# func 3 2
# func 4 12
# func 4 22
# func 4 32
# func 4 42
# func 4 52
# func 3 53
# func 3 54
# func 3 55
# time: 5.021925926208496
  • 使用线程锁必然效率会比较低

多进程

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# coding=utf-8


import multiprocessing as mp # 引入进程


def func(str_1, str_2): # 目标函数
print(str_1, str_2)


if __name__ == '__main__':
p_ = mp.Process(target=func, args=('hello', 'world')) # 定义一个进程
p_.start() # 启动进程
p_.join()

储存进程结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# coding=utf-8


import multiprocessing as mp # 引入进程
from queue import Queue


def func(list_): # 目标函数
res = 0
for i, val in enumerate(list_):
res += val
print(res)
queue_.put(res)


if __name__ == '__main__':
queue_ = mp.Queue()
data_ = [1, 2, 3, 4, 5]
result = []
p_ = mp.Process(target=func, args=(data_, )) # 定义一个进程
p_.start() # 启动进程
p_.join()
print('process', '\t', p_)
print('result', '\t', queue_.get())


# result
# 1
# 3
# 6
# 10
# 15
# process <Process(Process-1, stopped)>
# result 15

进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# coding=utf-8


import multiprocessing as mp


def job(x):
return x*x, x/2


if __name__ == '__main__':
pool = mp.Pool(processes=10) # 定义一个进程池;processes是来定义工作进程的数量,不定义则默认为电脑的核数
res = pool.map(job, range(10)) # 使用map获取结果;在map中放入函数和需要迭代计算的值,map会自动分配给cpu核
print(res)

res = pool.apply_async(job, (2,)) # 只能传递一个值,可以接收多个值
print(res.get()) # 使用get方法获取返回值

multi_res = [pool.apply_async(job, (i, )) for i in range(10)] # apply_async只能输入一个或一组参数,所以将apply_async放入迭代器中
print([res.get() for res in multi_res]) # 逐个提取出来

共享内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# coding=utf-8


import multiprocessing as mp
import time


value1 = mp.Value('d', 3.14) # 通过Value将一个值存储到内存中;Value只能是一维的数据,而且必须先定义数据类型
array = mp.Array('i', [1, 2, 3, 4]) # 通过Array将多个数据存放到内存中,但要求数据类型一致;Array只能是一维的数据,而且必须先定义数据类型


# 参数表类型
# | Type code | C Type | Python Type | Minimum size in bytes |
# | --------- | ------------------ | ----------------- | --------------------- |
# | `'b'` | signed char | int | 1 |
# | `'B'` | unsigned char | int | 1 |
# | `'u'` | Py_UNICODE | Unicode character | 2 |
# | `'h'` | signed short | int | 2 |
# | `'H'` | unsigned short | int | 2 |
# | `'i'` | signed int | int | 2 |
# | `'I'` | unsigned int | int | 2 |
# | `'l'` | signed long | int | 4 |
# | `'L'` | unsigned long | int | 4 |
# | `'q'` | signed long long | int | 8 |
# | `'Q'` | unsigned long long | int | 8 |
# | `'f'` | float | float | 4 |
# | `'d'` | double | float | 8 |

# 样例
def add_1(n, sleep_time):
for i in range(n):
number.value += 1
print('add_1', '\t', i, '\t', number.value)
time.sleep(sleep_time)


def add_2(n, sleep_time):
for i in range(n):
number.value += 2
print('add_2', '\t', i, '\t', number.value)
time.sleep(sleep_time)


if __name__ == '__main__':
number = mp.Value('b', 0) # 内存共享的变量

p_1 = mp.Process(target=add_1, args=(5, 0.01))
p_2 = mp.Process(target=add_2, args=(5, 0.01))
p_1.start()
p_2.start()
p_2.join()
p_1.join()

print(number.value)


# result
# 正确情况
#
# add_1 0 1
# add_2 0 3
# add_1 1 4
# add_2 1 6
# add_1 2 7
# add_2 2 9
# add_1 3 10
# add_2 3 12
# add_1 4 13
# add_2 4 15
# 15
#
# 坏情况
#
# add_1 0 1
# add_2 0 3
# add_1 1 4
# add_2 1 6
# add_1 2 8
# add_2 2 8
# add_2 3 9
# add_1 3 9
# add_1 4 11
# add_2 4 11
# 11
  • 共享内存时,内存中的数据同时被多个进程操作的话就会出现错误

进程锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# coding=utf-8


import multiprocessing as mp
import time


def no_lock_add(n, sleep_time, add_n):
for i in range(n):
number.value += add_n
print('add_1', '\t', i, '\t', number.value)
time.sleep(sleep_time)


def use_lock_add(n, sleep_time, add_n):
for i in range(n):
lock.acquire()
number.value += add_n
lock.release()
print('add_1', '\t', i, '\t', number.value)
time.sleep(sleep_time)


if __name__ == '__main__':
# 不加进程锁
number = mp.Value('b', 0) # 内存共享的变量
p_1 = mp.Process(target=no_lock_add, args=(5, 0.01, 1))
p_2 = mp.Process(target=no_lock_add, args=(5, 0.01, 2))
p_1.start()
p_2.start()
p_2.join()
p_1.join()
print(number.value) # 输出结果

# 加进程锁
number = mp.Value('b', 0) # 内存共享的变量
lock = mp.Lock() # 定义一个进程锁
p_1 = mp.Process(target=use_lock_add, args=(5, 0.01, 1))
p_2 = mp.Process(target=use_lock_add, args=(5, 0.01, 2))
p_1.start()
p_2.start()
p_2.join()
p_1.join()
print(number.value) # 输出结果


# result
#
# add_1 0 1
# add_1 0 3
# add_1 1 4
# add_1 1 4
# add_1 2 5
# add_1 2 5
# add_1 3 6
# add_1 3 6
# add_1 4 8
# add_1 4 8
# 8
# add_1 0 1
# add_1 0 3
# add_1 1 4
# add_1 1 6
# add_1 2 8
# add_1 2 9
# add_1 3 11
# add_1 3 12
# add_1 4 13
# add_1 4 15
# 15

多线程、多进程、单线程性能对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# coding=utf-8


import multiprocessing as mp # 引入进程
import threading as td # 引入线程
import time
import os


def func_add(n, sleep_time):
res = 0
for i in range(n):
res += i
time.sleep(sleep_time)


def process_(n, sleep_time): # 多进程
p_1 = mp.Process(target=func_add, args=(n, sleep_time))
p_2 = mp.Process(target=func_add, args=(n, sleep_time))
p_3 = mp.Process(target=func_add, args=(n, sleep_time))
p_4 = mp.Process(target=func_add, args=(n, sleep_time))
p_1.start()
p_2.start()
p_3.start()
p_4.start()
p_4.join()
p_3.join()
p_2.join()
p_1.join()


def threading_(n, sleep_time): # 多线程
t_1 = td.Thread(target=func_add, args=(n, sleep_time))
t_2 = td.Thread(target=func_add, args=(n, sleep_time))
t_3 = td.Thread(target=func_add, args=(n, sleep_time))
t_4 = td.Thread(target=func_add, args=(n, sleep_time))
t_1.start()
t_2.start()
t_3.start()
t_4.start()
t_4.join()
t_3.join()
t_2.join()
t_1.join()


def thread_process(n, sleep_time): # 多进程多线程
def two_thread(n, sleep_time): # 创建两个线程并行计算
t_1 = td.Thread(target=func_add, args=(n, sleep_time))
t_2 = td.Thread(target=func_add, args=(n, sleep_time))
t_1.start()
t_2.start()
t_2.join()
t_1.join()

p_1 = mp.Process(target=two_thread, args=(n, sleep_time))
p_2 = mp.Process(target=two_thread, args=(n, sleep_time))
p_1.start()
p_2.start()
p_2.join()
p_1.join()


def usual(n, sleep_time): # 正常的单线程
for _ in range(4):
res = 0
for i in range(n):
res += i
time.sleep(sleep_time)


def func_one(): # 并行计算比单进程快的情况
# 重复计算n以内的整数的和4次
n = 10 # n以内数的积
thread_n = 2 # 线程数量
process_n = 2 # 进程数量
sleep_time = 0.1 # 每计算一步的睡眠时间,用来调节单步计算量大小
print('thread', '\t', 'process', '\t', 'time')

# 多进程并行计算
time_1 = time.time()
process_(n, sleep_time)
time_2 = time.time()
print(' 1 \t 4 \t', time_2 - time_1)

# 多线程并行计算
time_1 = time.time()
threading_(n, sleep_time)
time_2 = time.time()
print(' 4 \t 1 \t', time_2 - time_1)

# 多线程多进程并行计算
time_1 = time.time()
thread_process(n, sleep_time)
time_2 = time.time()
print(' 4 \t 2 \t', time_2 - time_1)

# 正常 单线程流程计算
time_1 = time.time()
usual(n, sleep_time)
time_2 = time.time()
print(' 1 \t 1 \t', time_2 - time_1)


def func_two(): # 并行计算比单进程慢的情况
# 重复计算n以内的整数的和4次
n = 50000 # n以内数的积
thread_n = 2 # 线程数量
process_n = 2 # 进程数量
sleep_time = 0 # 每计算一步的睡眠时间,用来调节单步计算量大小
print('thread', '\t', 'process', '\t', 'time')

# 多进程并行计算
time_1 = time.time()
process_(n, sleep_time)
time_2 = time.time()
print(' 1 \t 4 \t', time_2 - time_1)

# 多线程并行计算
time_1 = time.time()
threading_(n, sleep_time)
time_2 = time.time()
print(' 4 \t 1 \t', time_2 - time_1)

# 多线程多进程并行计算
time_1 = time.time()
thread_process(n, sleep_time)
time_2 = time.time()
print(' 4 \t 2 \t', time_2 - time_1)

# 正常 单线程流程计算
time_1 = time.time()
usual(n, sleep_time)
time_2 = time.time()
print(' 1 \t 1 \t', time_2 - time_1)


if __name__ == '__main__':
func_one()
print()
func_two()


# result
# thread process time
# 1 4 1.0103631019592285
# 4 1 1.0256071090698242
# 4 2 1.0340228080749512
# 1 1 4.0948076248168945
#
# thread process time
# 1 4 0.05113697052001953
# 4 1 1.1482949256896973
# 4 2 0.2586557865142822
# 1 1 0.1494588851928711
  • 结论
    • 多线程和多进程在并行的时候未必就比单线程快,首先自己要衡量线程、进程之间切换的时间和计算的时间大小关系。
      • 如果切换时间远大于计算时间,那么单线程是最好的选择
      • 如果切换时间远小于计算时间,那么多线程、多进程是最好的选择(前提是这里的处理步骤可以使并行的)
    • 如果是非并行运算,那么直接使用单线程就好
    • 要创建多个进程或者多个线程的时候,不要使用循环,那样的话就是假并行计算,还会加大处理的时间。
    • 没有进程间、线程间通信的话,多线程和多进程进行并行计算的时间是相近的
    • 提高性能时,建议直接使用多进程+协程,抛弃多线程
Donate comment here