9.1. 进程简介
- 进程(任务):
- 在计算机中,其实进程就是一个任务。
- 在操作系统中,进程是程序执行和资源分配的基本单元。
- 单核CPU实现多任务
- 只是将CPU的时间快速的切换和分配到不同的任务上。
- 主频足够高,切换足够快,人的肉眼无法分辨而已。
- 多核CPU实现多任务
- 如果任务的数量不超过CPU的核心数,完全可以实现一个核心只做一个任务。
- 在操作系统中几乎是不可能的,任务量往往远远大于核心数。
- 同样采用轮训的方式,轮流执行不同的任务,只是做任务的’人’有多个而已。
Unix/Linux
操作系统提供了一个fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork
出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID。
1 | import os |
由于Windows
没有fork
调用,上面的代码在Windows
上无法运行。有了fork
调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache
服务器就是由父进程监听端口,每当有新的http
请求时,就fork
出子进程来处理新的http
请求。
9.2. 进程的并发与并行
- 并发:并发是指在资源有限的情况下,两者交替轮流使用资源,比如一段路(单核
CPU
资源)同时只能过一个人,A走一段后,让给B,B用完继续给A,交替使用,目的是提高效率。 - 并行:并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑(资源够用,比如三个线程,四核的
CPU
); - 区别:并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个
session
。并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
9.3. 进程管理
1 | from multiprocessing import Process |
os.getpid()
:获取当前的进程idos.getppid()
:获取主进程idProcess([group [, target [, name [, args [, kwargs]]]]])
:由该类实例化得到的对象,表示一个子进程中的任务(尚未启动),需要使用关键字的方式来指定参数,args
指定的为传给target
函数的位置参数,是一个元组形式,必须有逗号-
group
参数未使用,值始终为None
-target
表示调用对象,即子进程要执行的任务
-name
为子进程的名称
-args
表示调用对象的位置参数元组,args=(1, 2, 'egon',)
-kwargs
表示调用对象的字典,kwargs={'name': 'egon', 'age': 18}
p.daemone
:默认值为False
,如果设为True
,代表p
为后台运行的守护进程,当p
的父进程终止时,p
也随之终止,并且设定为True
后,p
不能创建自己的新进程,必须在p.start()
之前设置p.name
:进程的名称p.pid
:进程pid
p.exitcode
:进程在运行时为None
、如果为–N
,表示被信号N
结束(了解即可)p.authkey
:进程的身份验证键,默认是由os.urandom()
随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)p.start()
:启动进程,并调用该子进程中的p.run()
p.run()
:进程启动时运行的方法,正是它去调用target
指定的函数,我们自定义类的类中一定要实现该方法p.terminate()
:强制终止进程p
,不会进行任何清理操作,如果p
创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p
还保存了一个锁那么也将不会被释放,进而导致死锁p.is_alive()
:如果p仍然运行,返回True
p.join[timeout])
:主线程等待p
终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout
是可选的超时时间,需要强调的是,p.join
只能join
住start
开启的进程,而不能join
住run
开启的进程
9.4. 进程锁
当多个进程操作同一资源时,可能会造成混乱,甚至错误(如写文件),通常采用加锁的方式进行解决
1 | from multiprocessing import Process, Lock |
9.5. 自定义进程
采用继承Process
类开启进程的方式
1 | from multiprocessing import Process, Lock |
9.6. 进程池
创建少量的进程可以通过创建Process
对象完成,如果需要大量的进程和管理时就比较费劲。这时,可以通过进程池加以解决,而且可以通过参数控制进程池中进程的并发数,提高CPU利用率(创建进程池–>添加进程–>关闭进程池–>等待进程池结束–>设置回调)
1 | import multiprocessing |
Pool([numprocess[, initializer[, initargs]]])
:创建进程池,numprocess
为要创建的进程数,默认使用multiprocessing.cpu_count()
的CPU核心数;initializer
是每个工作进程启动时要执行的可调用对象,默认为None
;initargs
是要传给initializer
的参数组。pool.apply(func[, args[, kwargs]])
同步执行(阻塞式):在一个池工作进程中执行func(*args,**kwargs)
,然后返回结果,同步运行,阻塞,直到本次任务执行完毕后返回结果。此操作并不会在所有池工作进程中并执行func
函数。如果要通过不同参数并发地执行func
函数,必须从不同线程调用pool.apply()
函数或者使用pool.apply_async()
。pool.map(func, iterable[, chunksize=None])
:和pool.apply()
类似,只不过需要接收一个可迭代的参数对象。pool.apply_async(func[, args[, kwargs[, callback=None[, error_callback=None]]]])
异步执行(非阻塞):在一个池工作进程中执行func(*args,**kwargs)
,然后返回结果。此方法的结果是AsyncResult
类的实例,callback是可调用对象,接收输入参数。当func
的结果变为可用时,将理解传给callback
。callback
禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。pool.map_async(func, iterable[, chunksize=None[, callback=None[, error_callback=None]]]])
:和pool.apply_async()
类似,只不过需要接收一个可迭代的参数对象。
9.7. 数据共享
Process
之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python
的multiprocessing
模块包装了底层的机制,提供了Queue
、Pipes
等多种方式来交换数据。但要注意的是,全局变量不能共享。
1 | import multiprocessing |
管道(Pipe):创建管道时,得到两个连接;默认
dublex
为True
,表示全双工,两边都可以收发;若dublex
为False
,表示半双工,p_a
只能收,p_b
只能发。- 全双工通信:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import multiprocessing
def run(p_a):
# 子进程接收主进程发的数据
recv = p_a.recv()
print('子进程收到:', recv)
print('子进程发送:', ['a', 'b', 'c', 'd'])
# 子进程给主进程发数据
p_a.send(['a', 'b', 'c', 'd'])
if __name__ == "__main__":
# 创建管道, 默认为全双工
p_a, p_b = multiprocessing.Pipe()
p = multiprocessing.Process(target=run, args=(p_a,))
p.start()
print('主进程发送:', [1, 2, 3, 4, 5])
# 主进程向子进程发数据
p_b.send([1, 2, 3, 4, 5])
p.join()
# 主进程接收子进程的数据
recv = p_b.recv()
print('主进程收到:', recv)
print('主进程结束')
# 运行结果如下:
1, 2, 3, 4, 5] 主进程发送: [
子进程收到: [1, 2, 3, 4, 5]
子进程发送: ['a', 'b', 'c', 'd']
主进程收到: ['a', 'b', 'c', 'd']
主进程结束- 半双工通信:
dublex
为False
,表示半双工,p_a
只能收,p_b
只能发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import multiprocessing
def run(p_a):
# 子进程接收主进程发的数据
recv = p_a.recv()
print('子进程收到(p_a):', recv)
if __name__ == "__main__":
# 创建管道, 默认为全双工
p_a, p_b = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=run, args=(p_a,))
p.start()
print('主进程发送(p_b):', [1, 2, 3, 4, 5])
# 主进程向子进程发数据
p_b.send([1, 2, 3, 4, 5])
p.join()
# 主进程接收子进程的数据
print('主进程结束')
# 运行结果如下:
>>> 主进程发送(p_b): [1, 2, 3, 4, 5]
子进程收到(p_a): [1, 2, 3, 4, 5]
主进程结束队列(Queue)
Queue
本身就是一个消息队列程序,常用的方法有:
Queue.qsize()
:返回当前消息队列的消息数量。Queue.empty()
:如果队列为空,返回True
否则返回False
。Queue.full()
:如果队列满了,返回True
,否则False
。Queue.get()
:获取队列中的一条消息,然后将其从队列中移除。队列为空时,获取的时候也会阻塞。Queue.put('xxx')
:把内容存放进消息队列, 默认为True
数据会阻塞,设为False
时,如果队列已满会报错。Queue.close()
:关闭队列Queue.get_nowait()
相当于Queue.get(False)
。Queue.put_nowait()
相当于Queue.put(False)
。
1 | import multiprocessing |
如果采用进程池创建进程,使用队列进行通讯,可以这样:
1 | def put_data(queue): |
9.8. 共享内存
Python
中提供了强大的Manager
类,专门用于实现多进程之间的数据共享;Mangaer
类支持的类型非常多,如:Value
, Array
, List
, Dict
, Queue
, Lock
等;Manager
提供的数据不安全,需要通过Lock
作处理。
1 | import multiprocessing |
9.9. 多进程应用举例
使用多进程实现拷贝文件夹,注意直接使用copy()
效率比较低
1 | import multiprocessing |
- 本文作者: Lajos
- 本文链接: https://www.lajos.top/2020/05/04/No-9-Python语言基础-进程/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!