python进程
1、进程概念
进程是一个执行中的程序,资源分配的最小单位。每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。在单核CPU系统中的多进程,内存中可以有许多程序,但在给定一个时刻只有一个程序在运行;就是说,可能这一秒在运行进程A,下一秒在运行进程B,虽然两者都在内存中,都没有真正同时运行。
2、获取进程号方法
""" ps -aux 查看进程号 ps -aux | grep 2860 kill -9 2860 杀死进程 """ # 获取当前进程id (子进程) res = os.getpid() print(res) # 获取当前进程的父进程 res2 = os.getppid() print(res2)
3、进程基本使用
from multiprocessing import Process #1、创建基本进程 def func(): print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid())) # 为了解决windows 和 linux 系统的兼容问题,下面这句话必须加上,否则报错 if __name__ == "__main__": # 创建子进程,返回进程对象,执行func这个任务 p = Process(target=func) # 调用子进程 p.start() #2、创建带有参数的进程 def func(n): for i in range(1,n+1): print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid())) if __name__ == "__main__": n = 5 #创建子进程 p = Process(target=func,args=(n,)) #调用子进程 p.start() for i in range(1,n+1): print("*" * i) #3、进程间的数据彼此隔离 count = 10 def func(): global count count += 1 print("我是子进程count={}".format(count)) if __name__ == "__main__": p=Process(target=func) p.start() time.sleep(1) print(count)
4、相关信息
多个进程之间是异步并发的程序,因为cpu的调度策略问题,不一定哪个任务先执行,哪个任务后执行.
整体而言,主进程比子进程创建的速度要快,cpu遇到阻塞会立刻切换任务,等到阻塞态的任务变成了就绪态,cpu再回来执行
主程序会默认等到所有的子程序执行结束之后,在统一关闭程序,释放资源.
若不等待,有可能在后台存有多个未执行结束的子进程,会变成僵尸进程,不停的占用cpu,内存
增加系统的压力,所有方便于对进程的管理,主进程默认等待子进程.
5、join等待当前子进程全部执行完毕之后,主进程在执行(用来同步子父进程的)
from multiprocessing import Process import time,os # (1) join 基本语法 def func(): print("发送第一封邮箱,要求张工资") if __name__ == "__main__": p = Process(target=func) p.start() # 必须等待子进程全部执行结束之后,在执行主进程中的代码,用join来同步子父进程. p.join() # time.sleep(1) print("发送第二封邮箱,涨到一个月6万") # (2) 多个子进程的场景中使用join def func(i): time.sleep(1) print("发送第%s封邮箱,要求升职加薪" % (i)) if __name__ == "__main__": lst = [] for i in range(10): p = Process(target=func,args=(i,)) p.start() lst.append(p) for i in lst: i.join() print("主进程发最后一封邮件:此致敬礼~")
6、使用自定义方法创建进程(必须继承Process这个父类,所有进程执行任务的逻辑必须写在run方法里面)
# 1.基本语法 class MyProcess(Process): def run(self): print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid())) if __name__ == "__main__": p = MyProcess() p.start() print("3.子进程id>>{},4父进程id>>{}".format(os.getpid(),os.getppid())) # 2.带有参数自定类的方法 class MyProcess(Process): def __init__(self,arg): # 手动调用一下父类的构造方法(最终实现进程的创建) super().__init__() self.arg = arg def run(self): print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid())) print(self.arg) if __name__ == "__main__": p = MyProcess("我是传进来的参数") p.start() print("3.子进程id>>{},4父进程id>>{}".format(os.getpid(),os.getppid()))
7、守护进程
守护进程守护的是主进程,如果主进程中的所有代码执行完毕了,
当前这个守护进程会被立刻杀死,立刻终止.
语法:
进程.daemon = True 设置当前这个进程为守护进程
必须写在start()调用进程之前进行设置
默认:主进程会默认等待所有子进程执行结束之后,在关闭程序,释放资源
from multiprocessing import Process import time # (1) 基本使用 def func(): print("start当前子进程") time.sleep(1) print("end当前子进程") if __name__ == "__main__": p = Process(target = func) p.daemon = True p.start() print("主进程执行结束 ... ") # (2) 多个子进程的场景 def func1(): count = 1 while True: print("*" * count) time.sleep(0.5) count += 1 def func2(): print("start func2 当前子进程任务") time.sleep(3) print("end func2 当前子进程任务") if __name__ == "__main__": p1 = Process(target=func1) p2 = Process(target=func2) # 设置p1这个进程对象为守护进程 p1.daemon = True p1.start() p2.start() time.sleep(1) print("主进程执行结束 ... ")
8、锁
上锁和解锁是一对,只上锁不解锁会发生死锁现象(代码阻塞,不往下执行了)
互斥锁 : 互斥锁是进程之间的互相排斥,谁先抢到这个锁资源就先使用,后抢到后使用
from multiprocessing import Process,Lock # 创建一把锁 lock = Lock() # 上锁 lock.acquire() # 连续上锁不解锁是死锁 # lock.acquire() error print("运行中...") # 解锁 lock.release() print("执行程序 ... ")
9、信号量
信号量 Semaphore 本质上就是锁,只不过可以控制上锁的数量
Semaphore 可以设置上锁的数量
同一时间最多允许几个进程上锁
创建进程的时候,是异步并发
执行任务的时候,遇到锁会变成同步程序
from multiprocessing import Semaphore,Process import time,random sem = Semaphore(4) sem.acquire() # sem.acquire() # sem.acquire() # sem.acquire() # sem.acquire() # 上第五把锁出现死锁状态. print("执行响应的操作") sem.release() def ktv(person,sem): sem.acquire() print("%s进入了ktv,正在唱歌" % (person)) # 开始唱歌,唱一段时间 time.sleep(random.randrange(3,7)) # 3 4 5 6 print("%s离开了ktv,唱完了" % (person)) sem.release() if __name__ == "__main__": sem = Semaphore(4) lst = ["ss","bb","dd"] for i in lst: p = Process(target=ktv,args=(i,sem)) p.start()
10、事件(Event)
阻塞事件 :
e = Event()生成事件对象e
e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
如果是True 不加阻塞
如果是False 加阻塞
控制这个属性的值
set()方法 将这个属性的值改成True
clear()方法 将这个属性的值改成False
is_set()方法 判断当前的属性是否为True (默认上来是False)
from multiprocessing import Process,Event import time,random # 1基本使用 e = Event() print(e.is_set()) e.wait() print("程序运行中 ... ") #2 e = Event() e.set() # 将内部成员属性值由False -> True print(e.is_set()) e.wait() print("程序运行中 ... ") e.clear() # 将内部成员属性值由True => False e.wait() print("程序运行中2 ... ") #3 e = Event() # wait参数 可以写时间 wait(3) 代表最多等待3秒钟 e.wait(3) print("程序运行中3 ... ")
11、进程队列(可实现数据共享)
from multiprocessing import Process,Queue import queue """队列特点: 先进先出,后进后出""" q = Queue() # 1.put 往队列中放值 q.put(100) q.put(101) q.put(102) # 2.get 从队列中取值 res = q.get() print(res) res = q.get() print(res) # res = q.get() # print(res) # 3.队列中如果已经没有数据了,在调用get会发生阻塞. res = q.get() print(res) # 4.get_nowait 存在系统兼容性问题[windows]好用 [linux]不好用 不推荐 res = q.get_nowait() print(res) try: res = q.get_nowait() print(res) except queue.Empty: pass # 5.设置队列的长度 """设置队列长度最多存放4个元素""" print("<======>") q2 = Queue(4) q2.put(200) q2.put(201) q2.put(202) # q2.put(203) # 如果超过了队列的指定长度,在继续存值会出现阻塞现象 # q2.put(204) # 6.put_nowait() 非阻塞版本的put,超出长度后,直接报错 q2.put_nowait(204) try: q2.put_nowait(205) except queue.Full: pass # ### 2.进程之间的数据共享 def func(q3): # 2.子进程获取数据 res = q3.get() print(res) # 3.子进程存数据 q3.put("马生平") if __name__ == "__main__": q3 = Queue() p = Process(target=func,args=(q3,)) p.start() # 1.主进程添加数据 q3.put("王凡") # 为了等待子进程把数据放到队列中,需要加join p.join() # 4.主进程获取数据 res = q3.get() print(res) print("主程序结束 ... ")
12、JoinableQueue
from multiprocessing import Process, JoinableQueue import time,random """ put 存储 get 获取 task_done join task_done 和 join 配合使用的 队列中 1 2 3 4 5 put 一次 内部的队列计数器加1 get 一次 通过task_done让队列计数器减1 join函数,会根据队列计数器来判断是阻塞还是放行 队列计数器 = 0 , 意味着放行 队列计数器 != 0 , 意味着阻塞 """ # 基本语法 """ jq =JoinableQueue() jq.put("a") print(jq.get()) # 通过task_done让队列计数器减1 jq.task_done() jq.join() print("finish") """
13、 Manager ( list 列表 , dict 字典) 进程之间的共享数据(列表或者字典等)
rom multiprocessing import Process,Manager,Lock def work(data,lock): # 1.正常写法 """ # 上锁 lock.acquire() # 修改数据 data["count"] -= 1 # 解锁 lock.release() """ # 2.使用with语法可以简化上锁和解锁两步操作 with lock: data[0] += 1 if __name__ == "__main__": lst = [] lock = Lock() m = Manager() data = m.dict( {"count":20000} ) data = m.list( [1,2,3] ) for i in range(50): p = Process(target=work,args=(data,lock)) p.start() lst.append(p) # 确保所有进程执行完毕之后,在向下执行,打印数据,否则报错. for i in lst: i.join() print(data)
14、实现进程间通讯多种方式
管道,消息队列,共享内存
一、python中进程间通过管道的方式进行通信。创建一个管道,在进程的一端发送消息,在进程的另一端接收消息,通过这个内置的方法实现通信。一端发出消息,另一端接收消息。写法也比较简单,在上一篇文章中有对应示例。这里不再多做示例。
二、消息队列。创建一个中间容器,一端向中间容器中写入数据,另一端则去获取容器中数据,通过设置中间容器的方式实现通信。在这里我没有说明队列,而是用容器,而是因为其他容器也可以替代队列,只是队列在这里有一个极大的优势,就是队列中的数据有且只能取用一次,用完就没有了,所以不会出现重复。在这里就变的有优势。其实如果只是为了通信,也为了常规使用方便,或者控制更加精细,可以选择自己熟悉的容器都是可以的。如列表、元组、字典等其实都是可以的。就只是一端把数据放到中间容器中,另一商从中间容器中去获取即可。(在这里也不做演示,原理理解了,实现就只是技术上一的个小问题而已)。
三、共享内存。创建一个内存空间,然后向共享内存中写入数据,一端写入一端读取,通过这样的方式来实现内存共享。