欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Event事件、进程池与线程池、协程

程序员文章站 2022-06-04 11:45:39
[TOC] Event事件 用来控制线程的执行 出现 ,就会把这个线程设置为False,就不能执行这个任务; 只要有一个线程出现 ,就会告诉Event对象,把有 的用户全部改为True,剩余的任务就会立马去执行。由一些线程去控制另一些线程,中间通过Event。 进程池与线程池 1. 进程池与线程池是 ......

event事件

用来控制线程的执行

出现e.wait(),就会把这个线程设置为false,就不能执行这个任务;

只要有一个线程出现e.set(),就会告诉event对象,把有e.wait的用户全部改为true,剩余的任务就会立马去执行。由一些线程去控制另一些线程,中间通过event。

from threading import event
from threading import thread
import time
# 调用event实例化出对象
e = event()
#
# # 若该方法出现在任务中,则为false,阻塞
# e.wait()  # false
# # 若该方法出现在任务中,则将其他线程的false改为true,进入就绪态和运行态
# e.set()  # true


def light():
    print('红灯亮...')
    time.sleep(5)
    # 应该发出信号,告诉其他线程准备执行
    e.set()   # 将car中的false变为true
    print('绿灯亮...')


def car(name):
    print('正在等红灯...')
    # 让所有汽车任务进入阻塞态
    e.wait()  # false
    print(f'{name}正在加速飘逸...')


# 让一个light线程控制多个car线程
t = thread(target=light)
t.start()

for i in range(10):
    t = thread(target=car, args=(f'汽车{i}号', ))
    t.start()

进程池与线程池

  1. 进程池与线程池是用来控制当前程序允许创建(进程/线程)的数量

  2. 作用:保证在硬件允许的范围内创建(进程/线程)的数量

线程池使用一:

from concurrent.futures import threadpoolexecutor
import time

pool = threadpoolexecutor(5) # 5代表只能开启5个进程, 不加默认使用cpu的进程数

# threadpoolexecutor(5) # 5代表只能开启5个线程

# pool.submit() #异步提交任务, 括号里传函数地址


def task():
    print('线程任务开始了...')
    time.sleep(1)
    print('线程任务结束了...')


for line in range(5):
    pool.submit(task)

使用二:

from concurrent.futures import threadpoolexecutor
import time

pool = threadpoolexecutor(5) # 5代表只能开启5个进程, 不加默认使用cpu的进程数

# threadpoolexecutor(5) # 5代表只能开启5个线程

# pool.submit() #异步提交任务, 括号里传函数地址


def task():
    print('线程任务开始了...')
    time.sleep(1)
    print('线程任务结束了...')
    return 123


# 回调函数
def call_back(res):
    print(type(res))
    res2 = res.result()  # 注意:赋值操作不要与接收的res同名
    print(res2)


for line in range(5):
    pool.submit(task).add_done_callback(call_back)

pool.shutdown() 会让所有线程池的任务结束后,才往下执行代码

多线程爬取梨视频

利用requests模块,封装底层socket套接字

  1. 主页中获取所有视频id号,拼接视频详情页url
  2. 在视频详情页中获取真实视频url srcurl=
  3. 往真实视频url地址发送请求获取 视频 二进制数据
  4. 最后把视频二进制数据保存到本地

协程

  • 进程: 资源单位
  • 线程: 执行单位
  • 协程: 在单线程下实现并发

注意: 协程不是操作系统资源,目的是让单线程实现并发

协程目的

  • 操作系统:使用多道技术,切换 + 保存状态,一个是遇到io, 另一个是cpu执行时间过长
  • 协程:通过手动模拟操作系统 “多道计数”, 实现 切换 + 保存状态
    • 手动实现,遇到io切换,欺骗操作系统误以为没有io操作
    • 单线程时,遇到io,就切换 + 保存状态
    • 单线程时,对于计算密集型,来回切换 + 保存状态反而效率更低

优点:在io密集型的情况下,会提高效率

缺点:若在计算密集型的情况下,来回切换,反而效率更低

import time

def func1():
    for i in range(10000000):
        i+1

def func2():
    for i in range(10000000):
        i+1


start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)   # 1.0312113761901855


# 基于yield实现并发 在计算密集型的情况下效率更低

def func1():
    while true:
        10000000+1
        yield

def func2():
    g = func1()
    for i in range(10000000):
        i+1
        next(g)   # 每次执行next相当于切换到func1下面


start = time.time()
func2()
stop = time.time()
print(stop - start)  # 1.3294126987457275

gevent

gevent是一个第三方模块,可以帮你监听io操作,并切换

使用gevent的目的:在单线程下实现,遇到io就会 保存状态 + 切换

import time
from gevent import monkey

monkey.patch_all()  # 可以监听该程序下所有的io操作
from gevent import spawn, joinall # 用于做切换 + 保存状态


def func1():
    print('1')
    time.sleep(1)  # io操作


def func2():
    print('2')
    time.sleep(3)


def func3():
    print('3')
    time.sleep(5)

start = time.time()

s1 = spawn(func1)
s2 = spawn(func2)
s3 = spawn(func3)

s1.join()  # 发送信号,相当于等待自己(在单线程的情况下)
s2.join()
s3.join()

# joinall((s1, s2, s3))  #  一个个执行很麻烦,可以用joinall把这些全部装进去

end = time.time()
print(end - start)  # 5.006161451339722

tcp服务端socket套接字实现协程

服务端:

from gevent import monkey
from gevent import spawn
import socket

monkey.patch_all()
server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen(5)

def task(conn):
    while true:
        try:
            data = conn.recv(1024)
            if len(data) == 0:
                break
            print(data.decode('utf-8'))
            send_data = data.upper()
            conn.send(send_data)

        except exception:
            break

    conn.close()


def server2():
    while true:
        conn, addr = server.accept()
        print(addr)
        spawn(task, conn)


if __name__ == '__main__':
    s = spawn(server2)
    s.join()

客户端:

import socket
from threading import thread, current_thread

def client():
    client = socket.socket()
    client.connect(('127.0.0.1', 9999))

    number = 0
    while true:
        send_data = f'{current_thread().name} {number}'
        client.send(send_data.encode('utf-8'))

        data = client.recv(1024)
        print(data.decode('utf-8'))
        number += 1


for i in range(400):
    t = thread(target=client)
    t.start()