消息队列


我们之前学习过内置方法里的队列,队列:先进先出(使用频率很高),堆栈:先进后出(特定常见下用)。消息队列和队列一样也是先进先出。

from multiprocessing import Queue

q = Queue(5)  # 自定义队列的长度
# 朝队列中存放数据
q.put(111)
q.put(222)
q.put(333)
print(q.full())  # False  判断队列是否满了
q.put(444)
q.put(555)
print(q.full())  # True
# q.put(666)  # 超出最大长度 原地阻塞等待队列中出现空位
print(q.get())
print(q.get())
print(q.empty())  # False  判断队列是否空了
print(q.get())
print(q.get())
print(q.get())
print(q.empty())  # True
# print(q.get())  # 队列中没有值 继续获取则阻塞等待队列中给值
print(q.get_nowait())  # 队列中如果没有值 直接报错

full()判断队列是否是满状态

empty()判断队列是否是空状态

get_nowait()出队列里获取值,队列中如果没有值,直接报错

上述方法不能在并发的情况下使用,因为并发情况下可能前脚刚判断出结果,后脚就添加了数据或取出来数据。消息队列是支持进程间数据通信。

img

IPC机制(进程间通信)


1.主进程与子进程数据交互

from multiprocessing import Process, Queue

def producer(q):
    print('子进程producer从队列中取值>>>:', q.get())

def consumer(q):
    print('子进程consumer从队列中取值>>>:', q.get())
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q, ))
    p1 = Process(target=consumer, args=(q,))
    p.start()
    p1.start()
    q.put(123)  # 主进程往队列中存放数据123
    print('主进程')
# 子进程producer从队列中取值>>>: 123

2.两个子进程进行数据交互

from multiprocessing import Process, Queue


def producer(q):
    q.put('子进程producer往队列中添加值')

def consumer(q):
    print('子进程consumer从队列中取值>>>:', q.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=(q, ))
    p1 = Process(target=consumer, args=(q,))
    p.start()
    p1.start()
    print('主进程')
# 子进程consumer从队列中取值>>>: 子进程producer往队列中添加值

本质:不同内存空间中的进程数据交互

生产者消费者模型


生产者,负责生产也就是制作数据。消费者,负责消费也就是处理数据。

比如在爬虫领域中,会先通过代码爬取网页数据(爬取网页的代码就可以称之为是生产者),之后针对网页数据做筛选处理(处理网页的代码就可以称之为消费者)。如果使用进程来演示,除了有至少两个进程之外 还需要一个媒介(消息队列)。

from multiprocessing import Process, Queue, JoinableQueue
import time
import random


def producer(name, food, q):
    for i in range(5):
        data = f'{name}生产了{food}{i}'
        print(data)
        time.sleep(random.randint(1, 3))  # 模拟产生过程
        q.put(data)


def consumer(name, q):
    while True:
        food = q.get()
        # if food == None:
        #     print('完蛋了 没得吃了 要饿死人了')
        #     break
        time.sleep(random.random())
        print(f'{name}吃了{food}')
        q.task_done()  # 每次去完数据必须给队列一个反馈


if __name__ == '__main__':
    # q = Queue()
    q = JoinableQueue()
    p1 = Process(target=producer, args=('大厨jason', '韭菜炒蛋', q))
    p2 = Process(target=producer, args=('老板kevin', '秘制小汉堡', q))
    c1 = Process(target=consumer, args=('涛涛', q))
    c2 = Process(target=consumer, args=('龙龙', q))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    # 生产者生产完所有数据之后 往队列中添加结束的信号
    p1.join()
    p2.join()
    # q.put(None)  # 结束信号的个数要跟消费者个数一致才可以
    # q.put(None)
    """队列中其实已经自己加了锁 所以多进程取值也不会冲突 并且取走了就没了"""
    q.join()  # 等待队列中数据全部被取出(一定要让生产者全部结束才能判断正确)
    """执行完上述的join方法表示消费者也已经消费完数据了"""

img

线程


线程理论

进程是资源单位,线程是执行单位。进程相当于车间(一个个空间),线程相当于车间里面的流水线(真正干活的)。一个进程中至少有一个线程。

进程仅仅是在内存中开辟一块空间(提供线程工作所需的资源),线程真正被CPU执行,线程向进程所有资源。

为什么要有线程?开设线程的消耗远远小于进程,开进程需要申请内存空间或是拷贝代码。开线程无需申请内存空间、拷贝代码。一个进程内可以开设多个线程,多个线程之间数据是共享的。比如说开发了一个文本编辑器,获取用户输入并实时展示在屏幕上,并保存到硬盘中,这种多功能的应该开设多个线程而不是进程。

开设线程的两种方式

进程与线程的代码实操几乎是一样的。

from threading import Thread
import time

# 第一种
def task(name):
    print(f'{name} is running')
    time.sleep(3)
    print(f'{name} is over')

# 创建线程无需在__main__下面编写 但是为了统一 还是习惯在子代码中写
t = Thread(target=task, args=('jason', ))
t.start()  # 创建线程的开销极小 几乎是一瞬间就可以创建
print('主线程')

# 第二种
class MyThread(Thread):
    def __init__(self, username):
        super().__init__()
        self.username = username
    def run(self):
        print(f'{self.username} jason is running')
        time.sleep(3)
        print(f'{self.username} is over')

t = MyThread('jasonNB')
t.start()
print('主线程')

img

线程实现TCP服务端的并发

# 服务端
import socket
from threading import Thread

server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen()

def talk(sock):
    while True:
        data = sock.recv(1024)
        print(data.decode('utf8'))
        sock.send(data.upper())
       
while True:
    sock, addr = server.accept()
    # 每类一个客户端就创建一个线程做数据交互
    t = Thread(target=talk, args=(sock,))
    t.start()
    
# 客户端
import socket

client = socket.socket()
client.connect(('127.0.0.1', 8080))

while True:
    client.send(b'hello baby')
    data = client.recv(1024)
    print(data.decode('utf8'))

线程join方法

主线程结束就标志着整个进程的结束,是因为要确保子线程运行过程中所需的各项资源,主线程为什么要等着子线程结束才会结束整个进程。

from threading import Thread
import time

def task(name):
    print(f'{name} is running')
    time.sleep(3)
    print(f'{name} is over')

t = Thread(target=task, args=('jason', ))
t.start()
t.join()  # 主线程代码等待子线程代码运行完毕之后再往下执行
print('主线程')

一个进程内的多个线程数据共享

from threading import Thread

money = 10000000000
def task():
    global money
    money = 1

t = Thread(target=task)
t.start()
t.join()
print(money) # 1

img

线程对象属性和方法

1.一个进程下的多个线程是属于的同一个进程的,通过验证线程的pid发现是进程的pid即可确认。

2.统计进程下活跃的线程数

​ active_count() 注意主线程也算一个。

3.获取线程的名字

current_thread().name

​ MainThread 主线程

​ Thread-1、Thread-2、… 子线程

self.name

from threading import Thread, active_count, current_thread
import os
import time


class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name
    def run(self):
        print(self.name)

t1 = MyThread('jason')
t2 = MyThread('kevin')
t1.start()
t2.start()

def task():
    time.sleep(1)
    # print('子线程获取进程号>>>:',os.getpid())
    print(current_thread().name)

t = Thread(target=task)
t1 = Thread(target=task)
t.start()
t1.start()
print(active_count())
print(current_thread().name)
print('主线程获取进程号>>>:', os.getpid())

守护线程

from threading import Thread
import time


def task(name):
    print(f'{name} is running')
    time.sleep(3)
    print(f'{name} is over')

t1 = Thread(target=task, args=('jason',))
t2 = Thread(target=task, args=('kevin',))
# t1.daemon = True
t2.daemon = True
t1.start()
time.sleep(1)
t2.start()
print('主线程')
"""
主线程要等待所有非守护线程结束才可以结束
"""

img

GIL—全局解释器锁

官方文档

"""纯理论 不影响编程 只不过面试的时候可能会被问到"""
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

# 翻译
'''
在CPython中,全局解释器锁(GIL)是一个互斥锁,它可以防止多个本机线程同时执行Python字节码。作用就是,限制多线程同时执行,保证同一时间内只有一个线程在执行。这个锁是必要的,主要是因为CPython的内存管理不是线程安全的。(然而,自从GIL存在以来,其他特性已经成长为依赖于它所实施的保证。)
'''

理解

GIL的存在是必要的,如果不存在会产生垃圾回收机制与正常线程之间数据错乱,GIL是加在CPython解释器上面的互斥锁。同一个进程下的多个线程要想执行必须先抢GIL锁,所以同一个进程下多个线程肯定不能同时运行,也就是无法利用多核优势。

结论

  1. 同一个进程下的多个线程不能同时执行即不能利用多核优势。

  2. 如果多个任务都是IO密集型的,那么多线程更有优势(消耗的资源更少);如果多个任务都是计算密集型,那么多线程确实没有优势 但是可以用多进程。

  3. 所有的解释型语言都无法做到同一个进程下多个线程利用多核优势,GIL在实际编程中其实不用考虑。

版权声明:本文为早安原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/zaoan1207/p/16171653.html