1、进程概念

进程是一个执行中的程序,资源分配的最小单位。每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。在单核CPU系统中的多进程,内存中可以有许多程序,但在给定一个时刻只有一个程序在运行;就是说,可能这一秒在运行进程A,下一秒在运行进程B,虽然两者都在内存中,都没有真正同时运行。

2、获取进程号方法

 

"""
ps -aux 查看进程号
ps -aux | grep 2860
kill -9 2860  杀死进程
"""
# 获取当前进程id (子进程)
res = os.getpid()
print(res)

# 获取当前进程的父进程
res2 = os.getppid()
print(res2)

 

3、进程基本使用

from multiprocessing import Process
#1、创建基本进程
def func():
    print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid()))

# 为了解决windows 和 linux 系统的兼容问题,下面这句话必须加上,否则报错
if __name__ == "__main__":
    # 创建子进程,返回进程对象,执行func这个任务
    p = Process(target=func)
    # 调用子进程
    p.start()
#2、创建带有参数的进程
def func(n):
    for i in range(1,n+1):
        print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid()))

if __name__ == "__main__":
    n = 5
    #创建子进程
    p = Process(target=func,args=(n,))
    #调用子进程
    p.start()

    for i in range(1,n+1):        
        print("*" * i)
#3、进程间的数据彼此隔离
count = 10
def func():
    global count
    count += 1
    print("我是子进程count={}".format(count))

if __name__ == "__main__":
    p=Process(target=func)
    p.start()
    time.sleep(1)
    print(count)

4、相关信息

多个进程之间是异步并发的程序,因为cpu的调度策略问题,不一定哪个任务先执行,哪个任务后执行.
整体而言,主进程比子进程创建的速度要快,cpu遇到阻塞会立刻切换任务,等到阻塞态的任务变成了就绪态,cpu再回来执行

主程序会默认等到所有的子程序执行结束之后,在统一关闭程序,释放资源.
若不等待,有可能在后台存有多个未执行结束的子进程,会变成僵尸进程,不停的占用cpu,内存
增加系统的压力,所有方便于对进程的管理,主进程默认等待子进程.

5、join等待当前子进程全部执行完毕之后,主进程在执行(用来同步子父进程的)

from multiprocessing import Process
import time,os
# (1) join 基本语法
def func():
    print("发送第一封邮箱,要求张工资")
    
if __name__ == "__main__":
    p = Process(target=func)
    p.start()
    
    # 必须等待子进程全部执行结束之后,在执行主进程中的代码,用join来同步子父进程.
    p.join()
    # time.sleep(1)
    print("发送第二封邮箱,涨到一个月6万")
    
# (2) 多个子进程的场景中使用join
def func(i):
    time.sleep(1)
    print("发送第%s封邮箱,要求升职加薪" % (i))
    
if __name__ == "__main__":
    lst = []
    for i in range(10):
        p = Process(target=func,args=(i,))
        p.start()
        lst.append(p)

    for i in lst:
        i.join()
        
    print("主进程发最后一封邮件:此致敬礼~")

6、使用自定义方法创建进程(必须继承Process这个父类,所有进程执行任务的逻辑必须写在run方法里面)

# 1.基本语法
class MyProcess(Process):
    def run(self):    
        print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid()))
        
if __name__ == "__main__":
    p = MyProcess()
    p.start()
    print("3.子进程id>>{},4父进程id>>{}".format(os.getpid(),os.getppid()))    

# 2.带有参数自定类的方法
class MyProcess(Process):
    def __init__(self,arg):
        # 手动调用一下父类的构造方法(最终实现进程的创建)
        super().__init__()
        self.arg = arg

    def run(self):    
        print("1.子进程id>>{},2父进程id>>{}".format(os.getpid(),os.getppid()))
        print(self.arg)
    
if __name__ == "__main__":
    p = MyProcess("我是传进来的参数")
    p.start()
    print("3.子进程id>>{},4父进程id>>{}".format(os.getpid(),os.getppid()))

7、守护进程

守护进程守护的是主进程,如果主进程中的所有代码执行完毕了,
当前这个守护进程会被立刻杀死,立刻终止.
语法:
进程.daemon = True 设置当前这个进程为守护进程
必须写在start()调用进程之前进行设置

默认:主进程会默认等待所有子进程执行结束之后,在关闭程序,释放资源

from multiprocessing import Process
import time
# (1) 基本使用
def func():
    print("start当前子进程")
    time.sleep(1)
    print("end当前子进程")
if __name__ == "__main__":
    p = Process(target = func)
    p.daemon = True
    p.start()
    
    print("主进程执行结束 ... ")

# (2) 多个子进程的场景
def func1():
    count = 1
    while True:
        print("*" * count)
        time.sleep(0.5)
        count += 1
        
def func2():
    print("start func2 当前子进程任务")
    time.sleep(3)
    print("end   func2 当前子进程任务")

if __name__ == "__main__":
    p1 = Process(target=func1)
    p2 = Process(target=func2)
    
    # 设置p1这个进程对象为守护进程
    p1.daemon = True
    
    p1.start()
    p2.start()
    
    time.sleep(1)
    
    print("主进程执行结束 ... ")

8、锁

上锁和解锁是一对,只上锁不解锁会发生死锁现象(代码阻塞,不往下执行了)
互斥锁 : 互斥锁是进程之间的互相排斥,谁先抢到这个锁资源就先使用,后抢到后使用

from multiprocessing import Process,Lock
# 创建一把锁
lock = Lock()
# 上锁
lock.acquire()
# 连续上锁不解锁是死锁
# lock.acquire() error

print("运行中...")

# 解锁
lock.release()
print("执行程序 ... ")

9、信号量

信号量 Semaphore 本质上就是锁,只不过可以控制上锁的数量

Semaphore 可以设置上锁的数量
同一时间最多允许几个进程上锁
创建进程的时候,是异步并发
执行任务的时候,遇到锁会变成同步程序

from multiprocessing import Semaphore,Process
import time,random
sem = Semaphore(4)
sem.acquire()
# sem.acquire()
# sem.acquire()
# sem.acquire()
# sem.acquire() # 上第五把锁出现死锁状态.
print("执行响应的操作")
sem.release()

def ktv(person,sem):    
    sem.acquire()
    print("%s进入了ktv,正在唱歌" % (person))
    # 开始唱歌,唱一段时间
    time.sleep(random.randrange(3,7)) # 3 4 5 6 
    print("%s离开了ktv,唱完了" % (person))
    sem.release()
    
if __name__ == "__main__":
    sem = Semaphore(4)
    lst = ["ss","bb","dd"]
    for i in lst:
        p = Process(target=ktv,args=(i,sem))
        p.start()

10、事件(Event)

阻塞事件 :
e = Event()生成事件对象e
e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
如果是True 不加阻塞
如果是False 加阻塞

控制这个属性的值
set()方法 将这个属性的值改成True
clear()方法 将这个属性的值改成False
 is_set()方法 判断当前的属性是否为True (默认上来是False)

from multiprocessing import Process,Event
import time,random
# 1基本使用

e = Event()
print(e.is_set())
e.wait()
print("程序运行中 ... ")

#2
e = Event()
e.set() # 将内部成员属性值由False -> True
print(e.is_set())
e.wait()
print("程序运行中 ... ")

e.clear() # 将内部成员属性值由True => False 
e.wait()
print("程序运行中2 ... ")

#3
e = Event()
# wait参数 可以写时间 wait(3) 代表最多等待3秒钟
e.wait(3) 
print("程序运行中3 ... ")

11、进程队列(可实现数据共享)

from multiprocessing import Process,Queue
import queue
"""队列特点: 先进先出,后进后出"""

q = Queue()
# 1.put 往队列中放值
q.put(100)
q.put(101)
q.put(102)

# 2.get 从队列中取值
res = q.get()
print(res)
res = q.get()
print(res)
# res = q.get()
# print(res)

# 3.队列中如果已经没有数据了,在调用get会发生阻塞.

res = q.get()
print(res)


# 4.get_nowait 存在系统兼容性问题[windows]好用 [linux]不好用 不推荐

res = q.get_nowait()
print(res)
try:
    res = q.get_nowait()
    print(res)
except queue.Empty:
    pass

# 5.设置队列的长度
"""设置队列长度最多存放4个元素"""

print("<======>")
q2 = Queue(4)
q2.put(200)
q2.put(201)
q2.put(202)
# q2.put(203)
# 如果超过了队列的指定长度,在继续存值会出现阻塞现象
# q2.put(204)

# 6.put_nowait() 非阻塞版本的put,超出长度后,直接报错
q2.put_nowait(204)
try:
    q2.put_nowait(205)
except queue.Full:
    pass

# ### 2.进程之间的数据共享
def func(q3):
    # 2.子进程获取数据
    res = q3.get()
    print(res)
    
    # 3.子进程存数据
    q3.put("马生平")

if __name__ == "__main__":
    q3 = Queue()
    p = Process(target=func,args=(q3,))
    p.start()
    
    # 1.主进程添加数据
    q3.put("王凡")
    
    # 为了等待子进程把数据放到队列中,需要加join
    p.join()
    
    # 4.主进程获取数据
    res = q3.get()
    print(res)
    
    print("主程序结束 ... ")

12、JoinableQueue

from multiprocessing import Process, JoinableQueue
import time,random
"""
put 存储
get 获取
task_done 
join

task_done 和 join 配合使用的
队列中 1 2 3 4 5
put 一次 内部的队列计数器加1
get 一次 通过task_done让队列计数器减1
join函数,会根据队列计数器来判断是阻塞还是放行
    队列计数器  = 0 , 意味着放行
    队列计数器 != 0 , 意味着阻塞
"""

# 基本语法
"""
jq =JoinableQueue()
jq.put("a")
print(jq.get())
# 通过task_done让队列计数器减1
jq.task_done()
jq.join()
print("finish")
"""

13、 Manager ( list 列表 , dict 字典) 进程之间的共享数据(列表或者字典等)

rom multiprocessing import Process,Manager,Lock

def work(data,lock):

    # 1.正常写法
    """
    # 上锁
    lock.acquire()
    # 修改数据
    data["count"] -= 1
    # 解锁
    lock.release()
    """
    
    # 2.使用with语法可以简化上锁和解锁两步操作
    with lock:
        data[0] += 1

if __name__ == "__main__":
    lst = []
    lock = Lock()
    m = Manager()
    data = m.dict( {"count":20000} )
    data = m.list( [1,2,3] )
    for i in range(50):
        p = Process(target=work,args=(data,lock))
        p.start()
        lst.append(p)
        
    # 确保所有进程执行完毕之后,在向下执行,打印数据,否则报错.
    for i in lst:
        i.join()
        
    print(data)
        

 14、实现进程间通讯多种方式

管道,消息队列,共享内存

一、python中进程间通过管道的方式进行通信。创建一个管道,在进程的一端发送消息,在进程的另一端接收消息,通过这个内置的方法实现通信。一端发出消息,另一端接收消息。写法也比较简单,在上一篇文章中有对应示例。这里不再多做示例。

二、消息队列。创建一个中间容器,一端向中间容器中写入数据,另一端则去获取容器中数据,通过设置中间容器的方式实现通信。在这里我没有说明队列,而是用容器,而是因为其他容器也可以替代队列,只是队列在这里有一个极大的优势,就是队列中的数据有且只能取用一次,用完就没有了,所以不会出现重复。在这里就变的有优势。其实如果只是为了通信,也为了常规使用方便,或者控制更加精细,可以选择自己熟悉的容器都是可以的。如列表、元组、字典等其实都是可以的。就只是一端把数据放到中间容器中,另一商从中间容器中去获取即可。(在这里也不做演示,原理理解了,实现就只是技术上一的个小问题而已)。

三、共享内存。创建一个内存空间,然后向共享内存中写入数据,一端写入一端读取,通过这样的方式来实现内存共享。

版权声明:本文为菩提叶子原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/songyunjie/p/16829465.html