有哪些形式?
多进程
fork:
Unix-like下有一个fork,可以用来创建一个子进程。Python的os模块中封装了fork模块。os.fork()可以返回一个新创建的子进程的PID。fork这是就在两个进程中被调用了。所以会有两个返回值:
- 主进程:返回子进程的PID
- 子进程:返回0
- 如果出现错误,fork返回一个负值。
然后开始执行下一条语句。注意1)系统先给新的进程分配资源,例如存储数据和代码的空间。2)fork只能在Unix-like的平台上使用。
# coding=gb2312 import os """ print('Process (%s) start...' % os.getpid()) pid = os.fork() if pid == 0: print('I am child progress (%s) and my parent is %s' % (os.getpid(), pid)) else: print('I (%s) just created a child process (%s)' % (os.getpid(), pid)) """ for i in range(1, 3): fid = os.fork() if fid == 0: print('child') else: print('father') """输出是 代码输出是: father child father child father child """
- os.getppid():获取当前进程的PID
- os.getppid():可以返回当前进程的父进程。
第一步:创建了father和child。第一个father输出,child复制了当前的循环过程,所以判断是child,输出。截至father和child都需要第二次循环。第一创建的子进程变成了相对的父进程,所以他会再创建一个子进程并自我输出。原先的父进程成也会有相同的行为。所以*2。最终会输出3组。
multiprocessing模块和进程池:
multiprocessing进程:
from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.')
- start():启动一个进程
- join():等待子进程结束,然后继续执行
Pool的方法:
# coding=utf-8 # 创建进程池 from multiprocessing import Pool import os import time import random def long_time_task(name): print('RUn task %s (%s)' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) def run_proc(name): print('RUn child process %s (%s)' % (name, os.getpid())) if __name__ == '__main__': print('Parent process %s' % os.getpid()) p = Pool(10) for i in range(1, 11): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') """ 输出 Task 4 runs 0.75 seconds. Task 7 runs 0.99 seconds. Task 3 runs 1.04 seconds. Task 6 runs 1.20 seconds. Task 2 runs 1.23 seconds. Task 9 runs 1.36 seconds. Task 5 runs 1.40 seconds. Task 10 runs 1.66 seconds. Task 1 runs 1.76 seconds. Task 8 runs 2.38 seconds. All subprocesses done. """
- join():表示等待当前的进程结束后,继续执行。
- Pool(n)的构造:表示一次可以同时运行n个进程。
- close():不允许添加进程。close必须要在join()之前调用。
子进程
可以控制他们的输入输出。
# coding=utf-8 # subprocess 管理 import subprocess p = subprocess.Popen(['python3.5'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'import os\nprint(os.environ)\nquit()\n') print(output.decode('utf-8')) print('Exit code:', p.returncode)
subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0):
- args : 是传递的参数,可以使tuple,list等
- stdin、stdout和stderr可以分别表示程序的标准输入、输出、错误句柄。如果为None则表示从主进程继承。subprocess.PIPE可以与进程的输入输出进行通信。
- communicate(b'str')方法:可以将字节流str发送到子进程中。
- err/output.decode('utf-8'):将子进程的错误输出/输出解码为UTF-8的str。
进程间通信
# coding=utf-8 # 进程通信 from multiprocessing import Process, Queue import os import time import random #写入数据 def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 读取 def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__ == '__main__': q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate() # pr进程里是死循环,无法等待其结束,只能强行终止:
这个使用了队列,一个队列用于输入一个用于输出。
多线程
因为balance等价于temp = balance + n、temp复制给balance,而多个子线程会切换又可能没有正确地赋值从而导致没有上“锁”全局变量造成破坏(访问冲突)。上锁之后一个线程会锁住资源知道结束,所以可以保证数据的完整性。
# coding=utf-8 # 线程的变量共享的问题 # import time import threading balance = 0 lock = threading.Lock() def change_it(n): global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(1, 10000001): lock.acquire() try: change_it(n) finally: lock.release() t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)
Lock的方法:
- acquire():获取是否可以访问。如果上锁了,就等待解锁否则就会上锁然后访问。
- release():释锁
这样(包含了锁的并发任务)也就阻止了多线程并发地执行,也就降低了性能。
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。
不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
ThreadLocal
如果我们可以在一个进程中将每个子线程所使用的变量按照子线程名进行管理就可以防止访问变量的冲突。
# coding=utf-8 # 多线程的局部变量 import threading import time local_school = threading.local() class Student(object): def __init__(name): pass def process_stu(): std = local_school.student time.sleep(local_school.intval) print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name, intval): local_school.student = name local_school.intval = intval process_stu() t1 = threading.Thread(target=process_thread, args=('Alice', 1), name='Thread-A') t2 = threading.Thread(target=process_thread, args=('Bob', 2), name='Thread-B') t1.start() t2.start() print('666') """ output : 666 Hello, Alice (in Thread-A) Hello, Bob (in Thread-B) A和B是同时等待 所以会相差1s输出 """
所以创建一个线程需要指定:
- name :作为Thread构造的参数,便是线程名字
- target :便是可调用对象
threading.current_thread()可以返回当前子线程的对象,他含有的属性就是构造时的参数。这里的local_school就是一个类似于dict的可以返回动态绑定的属性student的对应线程的值。
多线程与多进程的优缺点
- 线程的切换开销巨大,需要保存当前的CPU寄存器状态、内存页等。然后,把新任务的执行环境准备好(恢复上次的寄存器状态,切换内存页等),才能开始执行。这个切换过程虽然很快,但是也需要耗费时间。如果有几千个任务同时进行,操作系统可能就主要忙着切换任务,根本没有多少时间去执行任务了,这种情况最常见的就是硬盘狂响,点窗口无反应,系统处于假死状态。
- 计算密集型 vs. IO密集型:C语言等计算的效率高,但是处理IO人物的效率不是很高。Python进行大量的计算也不是很好。所以将Python与其他的在运行效率高的语言结合起来就可以取长补短。
- 异步IO:现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。
分布式进程
发送方:
# coding=utf-8 # 分布式进程 这个是发送方 import random, time, queue from multiprocessing.managers import BaseManager task_queue = queue.Queue() # 创建一个任务队列 result_queue = queue.Queue() # 结果队列 class QueueManager(BaseManager): pass # 自定义的管理类 QueueManager.register('get_task_queue', callable=lambda: task_queue) # 注册接受任务可调用对象(lambda) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 注册获取结果的可调用对象(lambda) manager = QueueManager(address=('', 5000), authkey=b'abc') # 访问IP 127.0.0.1,端口为5000,验证码是abc的远程进程 manager.start() # 启动服务 task = manager.get_task_queue() result = manager.get_result_queue() for i in range(10): n = random.randint(0, 10000) print('Put task %d' % n) task.put(n) print('Try get result...') for i in range(10): r = result.get(timeout=10) # 允许延迟10s print('Result: %s' % r) manager.shutdown() # okay print('master exit.')
任务接收方:
# coding=utf-8 # 分布式进程 这个是接受方 import random, time, queue from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') server_addr = '127.0.0.1' print('Connecting to server %s...' % server_addr) m = QueueManager(address=(server_addr, 5000), authkey=b'abc') m.connect() task = m.get_task_queue() result = m.get_result_queue() for i in range(10): try: n = task.get(timeout=1) # 接收任务,然后把获取任务的信息。 print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) # 记得把结果输出给网络上的结果接收方 except queue.Empty: # 如果是空队列,会出错 print('task queue is empty') print('worker exit.')
发送方输出:
$ python3 task_worker.py Connect to server 127.0.0.1... run task 3411 * 3411... run task 1605 * 1605... run task 1398 * 1398... run task 4729 * 4729... run task 5300 * 5300... run task 7471 * 7471... run task 68 * 68... run task 4219 * 4219... run task 339 * 339... run task 7866 * 7866... worker exit.
任务接收方输出(发送了结果):
Result: 3411 * 3411 = 11634921 Result: 1605 * 1605 = 2576025 Result: 1398 * 1398 = 1954404 Result: 4729 * 4729 = 22363441 Result: 5300 * 5300 = 28090000 Result: 7471 * 7471 = 55815841 Result: 68 * 68 = 4624 Result: 4219 * 4219 = 17799961 Result: 339 * 339 = 114921 Result: 7866 * 7866 = 61873956
请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。
注意
- 在注册接口是时,不要用lambda
- 创建manager时加上IP地址。
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
- 一定要在main中使用分布式计算。
需要注意的是,在windows下,如果子进程序不是在__main__中创建的,那么就会出错。因为windows在创建子进程的时候,会将创建它的py文件import进去。import进去机会执行,那么就会不断地创建子进程,所以会出错。
因此在windows下,需要将其包含在__main__中。
- 主函数调用之前使用(目前还不知道是干嘛的,参考最后一个引用连接):freeze_support()
针对Windows的优化
- 一定要放在主函数调用、而且不用lambda。使用1080的端口(更具情况需要):
# coding=gb2312 # windows 可以使用的分布式 from multiprocessing.managers import BaseManager from multiprocessing import freeze_support import queue import logging import random import sys from threading import Thread class QueueManager(BaseManager): pass task_queue = queue.Queue() result_queue = queue.Queue() def get_task_queue(): global task_queue return task_queue def get_result_queue(): global result_queue return result_queue def main(): QueueManager.register('get_result_queue', callable=get_result_queue) QueueManager.register('get_task_queue', callable=get_task_queue) iManager = QueueManager(address=('127.0.0.1', 1080), authkey=b'abc') logging.info('服务器创建完成') iManager.start() logging.info('服务器启动') taskQueue = iManager.get_task_queue() resultQueue = iManager.get_result_queue() for i in range(1, 11): print('Put task %d...' % i) taskQueue.put(random.randint(0, 10000)) for i in range(1, 11): r = resultQueue.get(timeout=10) print('Result: %s' % r) iManager.shutdown() if __name__ == '__main__': print(sys.argv) freeze_support() # 测试 https://hg.python.org/cpython/file/d9893d13c628/Lib/multiprocessing/forking.py#l302 main()
- 接收任务方:(也需要同上的修改)
# coding=gb2312 from multiprocessing.managers import BaseManager import queue import logging import sys import time logging.basicConfig(level=logging.DEBUG) class QueueManager(BaseManager): pass def main(): # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 1080), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(1, 11): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except task.Empty: print('task queue is empty.') # 处理结束: print('worker exit.') if __name__ == '__main__': main()
参考&引用
- 廖雪峰的博客&网友的评论:http://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000#0
- Python doc 子进程:https://docs.python.org/3/library/subprocess.html
- subprocess的参数:http://blog.csdn.net/imzoer/article/details/8678029
- 在main函数中创建子进程:https://segmentfault.com/a/1190000008118997
- 多进程使用freeze_support():http://stackoverflow.com/questions/13922597/multiprocessing-freeze-support
- freeze_support()的实现:https://hg.python.org/cpython/file/d9893d13c628/Lib/multiprocessing/forking.py#l302