欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python3 系列之 并行编程

程序员文章站 2023-04-02 12:15:26
进程和线程 进程是程序运行的实例。一个进程里面可以包含多个线程,因此同一进程下的多个线程之间可以共享线程内的所有资源,它是操作系统动态运行的基本单元;每一个线程是进程下的一个实例,可以动态调度和独立运行,由于线程和进程有很多类似的特点,因此,线程又被称为轻量级的进程。线程的运行在进程之下,进程的存在 ......

进程和线程

进程是程序运行的实例。一个进程里面可以包含多个线程,因此同一进程下的多个线程之间可以共享线程内的所有资源,它是操作系统动态运行的基本单元;每一个线程是进程下的一个实例,可以动态调度和独立运行,由于线程和进程有很多类似的特点,因此,线程又被称为轻量级的进程。线程的运行在进程之下,进程的存在依赖于线程;

开胃菜

基于 python3 创建一个简单的进程示例

from threading import thread
from time import sleep


class cookbook(thread):
    def __init__(self):
        thread.__init__(self)
        self.message = "hello parallel python cookbook!!\n"

    def print_message(self):
        print(self.message)

    def run(self):
        print("thread starting\n")
        x = 0
        while x < 10:
            self.print_message()
            sleep(2)
            x += 1
        print("thread ended!\n")


print("process started")
hello_python = cookbook()

hello_python.start()
print("process ended")

需要注意的是,永远不要让线程在后台默默执行,当其执行完毕后要及时释放资源。

基于线程的并行

多线程编程一般使用共享内存空间进行线程间的通信,这就使管理内存空间成为多线程编程的关键。python 通过标准库 threading 模块来管理线程,具有以下的组件:

  • 线程对象
  • lock 对象
  • rlock 对象
  • 信号对象
  • 条件对象
  • 事件对象

定义一个线程

基本语法

示例代码如下所示

import threading


def function(i):
    print("function called by thread: {0}".format(i))
    return


threads = []
for i in range(5):
    t = threading.thread(target=function, args=(i,))
    threads.append(t)
    t.start()

lambda t, threads: t.join()

需要注意的是,线程创建后并不会自动运行,需要主动调用 start() 方法来启动线程,join() 会让调用它的线程被阻塞直到执行结束。(ps:可通过调用 t.setdaemon(true) 使其为后台线程避免主线程被阻塞)

线程定位

示例代码如下所示

import threading
import time


def first_function():
    print("{0} is starting".format(threading.currentthread().getname()))
    time.sleep(2)
    print("{0} is exiting".format(threading.currentthread().getname()))


def second_function():
    print("{0} is starting".format(threading.currentthread().getname()))
    time.sleep(2)
    print("{0} is exiting".format(threading.currentthread().getname()))


def third_function():
    print("{0} is starting".format(threading.currentthread().getname()))
    time.sleep(2)
    print("{0} is exiting".format(threading.currentthread().getname()))

if __name__ == "__main__":
    t1 = threading.thread(target=first_function,name="first")
    t2 = threading.thread(target=second_function,name="second")
    t3 = threading.thread(target=third_function,name="third")

    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

通过设置 threading.thread() 函数的 name 参数来设置线程名称,通过 threading.currentthread().getname() 来获取当前线程名称;线程的默认名称会以 thread-{i} 格式来定义

自定义一个线程对象

示例代码如下所示

import threading
import time

exitflag = 0


class mythread(threading.thread):
    def __init__(self, threadid, name, counter):
        threading.thread.__init__(self)
        self.threadid = threadid
        self.name = name
        self.counter = counter

    def run(self):
        print("starting:{0}".format(self.name))
        print_time(self.name, self.counter, 5)
        print("exiting:{0}".format(self.name))


def print_time(threadname, delay, counter):
    while counter:
        if exitflag:
            thread.exit()
        time.sleep(delay)
        print("{0} {1}".format(threadname, time.ctime(time.time())))
        counter -= 1


t1 = mythread(1, "thread-1", 1)
t2 = mythread(2, "thread-2", 1)

t1.start()
t2.start()

t1.join()
t2.join()

print("exiting main thread.")

如果想自定义一个线程对象,首先就是要定义一个继承 threading.thread 类的子类,实现构造函数, 并重写 run() 方法即可。

线程同步

lock

示例代码如下所示

import threading

shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
count = 100000
shared_resource_lock = threading.lock()


def increment_with_lock():
    global shared_resource_with_lock
    for i in range(count):
        shared_resource_lock.acquire()
        shared_resource_with_lock += 1
        shared_resource_lock.release()


def decrement_with_lock():
    global shared_resource_with_lock
    for i in range(count):
        shared_resource_lock.acquire()
        shared_resource_with_lock -= 1
        shared_resource_lock.release()


def increment_without_lock():
    global shared_resource_with_no_lock
    for i in range(count):
        shared_resource_with_no_lock += 1


def decrement_wthout_lock():
    global shared_resource_with_no_lock
    for i in range(count):
        shared_resource_with_no_lock -= 1


if __name__ == "__main__":
    t1 = threading.thread(target=increment_with_lock)
    t2 = threading.thread(target=decrement_with_lock)
    t3 = threading.thread(target=increment_without_lock)
    t4 = threading.thread(target=decrement_wthout_lock)
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    print("the value of shared variable with lock management is :{0}".format(
        shared_resource_with_lock))
    print("the value of shared variable with race condition is :{0}".format(
        shared_resource_with_no_lock))

通过 threading.lock() 方法我们可以拿到线程锁,一般有两种操作方式:acquire()release() 在两者之间是加锁状态,如果释放失败的话会显示 runtimerror() 的异常。

rlock

rlock 也叫递归锁,和 lock 的区别在于:谁拿到谁释放,是通过 threading.rlock() 来拿到的;

示例代码如下所示

import threading
import time


class box(object):
    lock = threading.rlock()

    def __init__(self):
        self.total_items = 0

    def execute(self, n):
        box.lock.acquire()
        self.total_items += n
        box.lock.release()

    def add(self):
        box.lock.acquire()
        self.execute(1)
        box.lock.release()

    def remove(self):
        box.lock.acquire()
        self.execute(-1)
        box.lock.release()


def adder(box, items):
    while items > 0:
        print("adding 1 item in the box")
        box.add()
        time.sleep(1)
        items -= 1


def remover(box, items):
    while items > 0:
        print("removing 1 item in the box")
        box.remove()
        time.sleep(1)
        items -= 1


if __name__ == "__main__":
    items = 5
    print("putting {0} items in the box".format(items))
    box = box()
    t1 = threading.thread(target=adder, args=(box, items))
    t2 = threading.thread(target=remover, args=(box, items))

    t1.start()
    t2.start()

    t1.join()
    t2.join()
    print("{0} items still remain in the box".format(box.total_items))

信号量

示例代码如下所示

import threading
import time
import random

semaphore = threading.semaphore(0)


def consumer():
    print("consumer is waiting.")
    semaphore.acquire()
    print("consumer notify:consumed item numbers {0}".format(item))


def producer():
    global item
    time.sleep(10)
    item = random.randint(0, 10000)
    print("producer notify:produced item number {0}".format(item))
    semaphore.release()


if __name__ == "__main__":
    for i in range(0, 5):
        t1 = threading.thread(target=producer)
        t2 = threading.thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()

    print("program terminated.")

信号量初始化为 0 ,然后在两个并行线程中,通过调用 semaphore.acquire() 函数会阻塞消费者线程,直到 semaphore.release() 在生产者中被调用,这里模拟了生产者-消费者 模式来进行了测试;如果信号量的计数器到了0,就会阻塞 acquire() 方法,直到得到另一个线程的通知。如果信号量的计数器大于0,就会对这个值-1然后分配资源。

使用条件进行线程同步

解释条件机制最好的例子还是生产者-消费者问题。在本例中,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后销毁)。当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。

示例代码如下所示

from threading import thread, condition
import time

items = []
condition = condition()


class consumer(thread):
    def __init__(self):
        thread.__init__(self)

    def consume(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 0:
            condition.wait()
            print("consumer notify:no item to consum")
        items.pop()
        print("consumer notify: consumed 1 item")
        print("consumer notify: item to consume are:{0}".format(len(items)))

        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 20):
            time.sleep(2)
            self.consume()


class producer(thread):
    def __init__(self):
        thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 10:
            condition.wait()
            print("producer notify:items producted are:{0}".format(len(items)))
            print("producer notify:stop the production!!")
        items.append(1)
        print("producer notify:total items producted:{0}".format(len(items)))
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 20):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

通过 condition.acquire() 来获取锁对象,condition.wait() 会使当前线程进入阻塞状态,直到收到 condition.notify() 信号,同时,调用信号的通知的对象也要及时调用 condition.release() 来释放资源;

使用事件进行线程同步

事件是线程之间用于通信的对。有的线程等待信号,有的线程发出信号。

示例代码如下所示

import time
from threading import thread, event
import random

items = []
event = event()


class consumer(thread):
    def __init__(self, items, event):
        thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while true:
            time.sleep(2)
            self.event.wait()
            item = self.items.pop()
            print('consumer notify:{0} popped from list by {1}'.format(
                item, self.name))


class producer(thread):
    def __init__(self, integers, event):
        thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        for i in range(100):
            time.sleep(2)
            item = random.randint(0, 256)
            self.items.append(item)
            print('producer notify: item  n° %d appended to list by %s' %
                  (item, self.name))
            print('producer notify: event set by %s' % self.name)
            self.event.set()
            print('produce notify: event cleared by %s ' % self.name)
            self.event.clear()


if __name__ == "__main__":
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

使用 with 语法简化代码

import threading
import logging

logging.basicconfig(level=logging.debug,
                    format='(%(threadname)-10s) %(message)s')


def threading_with(statement):
    with statement:
        logging.debug("%s acquired via with" % statement)


def threading_not_with(statement):
    statement.acquire()
    try:
        logging.debug("%s acquired directly " % statement)
    finally:
        statement.release()


if __name__ == "__main__":
    lock = threading.lock()
    rlock = threading.rlock()
    condition = threading.condition()
    mutex = threading.semaphore(1)
    threading_synchronization_list = [lock, rlock, condition, mutex]

    for statement in threading_synchronization_list:
        t1 = threading.thread(target=threading_with, args=(statement,))
        t2 = threading.thread(target=threading_not_with, args=(statement,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()

使用 queue 进行线程通信

queue 常用的方法有以下四个:

  • put():往 queue 中添加一个元素
  • get():从 queue 中删除一个元素,并返回该元素
  • task_done():每次元素被处理的时候都需要调用这个方法
  • join():所有元素都被处理之前一直阻塞
from threading import thread, event
from queue import queue
import time
import random


class producer(thread):
    def __init__(self, queue):
        thread.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("producer notify: item item n° %d appended to queue by %s" %
                  (item, self.name))
            time.sleep(1)


class consumer(thread):
    def __init__(self, queue):
        thread.__init__(self)
        self.queue = queue

    def run(self):
        while true:
            item = self.queue.get()
            print('consumer notify : %d popped from queue by %s' %
                  (item, self.name))
            self.queue.task_done()


if __name__ == "__main__":
    queue = queue()
    t1 = producer(queue)
    t2 = consumer(queue)
    t3 = consumer(queue)
    t4 = consumer(queue)
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()

基于进程的并行

multiprocessing 是 python 标准库中的模块,实现了共享内存机制。

异步编程

使用 concurrent.futures 模块

该模块具有线程池和进程池,管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能;此模块由以下部分组成

  • concurrent.futures.executor: 这是一个虚拟基类,提供了异步执行的方法。
  • submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
  • map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
  • shutdown(wait=true): 发出让执行者释放所有资源的信号。
  • concurrent.futures.future: 其中包括函数的异步执行。future对象是submit任务(即带有参数的functions)到executor的实例。

示例代码如下所示

import concurrent.futures
import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


def evaluate_item(x):
    result_item = count(x)
    return result_item


def count(number):
    for i in range(0, 1000000):
        i = i + 1
    return i * number


if __name__ == "__main__":
    # 顺序执行
    start_time = time.time()
    for item in number_list:
        print(evaluate_item(item))
    print("sequential execution in " + str(time.time() - start_time), "seconds")
    # 线程池执行
    start_time_1 = time.time()
    with concurrent.futures.threadpoolexecutor(max_workers=5) as executor:
        futures = [executor.submit(evaluate_item, item)
                   for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("thread pool execution in " +
          str(time.time() - start_time_1), "seconds")
    # 线程池执行
    start_time_2 = time.time()
    with concurrent.futures.processpoolexecutor(max_workers=5) as executor:
        futures = [executor.submit(evaluate_item, item)
                   for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("process pool execution in " +
          str(time.time() - start_time_2), "seconds")

使用 asyncio 管理事件循环

python 的 asyncio 模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:

  • 事件循环: 在asyncio模块中,每一个进程都有一个事件循环。
  • 协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如io)完成之后,从之前暂停的地方恢复执行。
  • futures: 定义了 future 对象,和 concurrent.futures 模块一样,表示尚未完成的计算。
  • tasks: 这是asyncio的子类,用于封装和管理并行模式下的协程。

asyncio 提供了以下方法来管理事件循环:

  • loop = get_event_loop(): 得到当前上下文的事件循环。
  • loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。
  • loop.call_soon(callback, argument): 尽可能快调用 callback, call_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。
  • loop.time(): 以float类型返回当前时间循环的内部时间。
  • asyncio.set_event_loop(): 为当前上下文设置事件循环。
  • asyncio.new_event_loop(): 根据此策略创建一个新的时间循环并返回。
  • loop.run_forever(): 在调用 stop() 之前将一直运行。

示例代码如下所示

import asyncio
import datetime
import time


def fuction_1(end_time, loop):
    print("function_1 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, fuction_2, end_time, loop)
    else:
        loop.stop()


def fuction_2(end_time, loop):
    print("function_2 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()


def function_3(end_time, loop):
    print("function_3 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, fuction_1, end_time, loop)
    else:
        loop.stop()


def function_4(end_time, loop):
    print("function_4 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()


loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0
loop.call_soon(fuction_1, end_loop, loop)
loop.run_forever()
loop.close()

使用 asyncio 管理协程

示例代码如下所示

import asyncio
import time
from random import randint


@asyncio.coroutine
def startstate():
    print("start state called \n")
    input_val = randint(0, 1)
    time.sleep(1)
    if input_val == 0:
        result = yield from state2(input_val)
    else:
        result = yield from state1(input_val)
    print("resume of the transition:\nstart state calling" + result)


@asyncio.coroutine
def state1(transition_value):
    outputval = str("state 1 with transition value=%s \n" % (transition_value))
    input_val = randint(0, 1)
    time.sleep(1)
    print("...evaluating...")
    if input_val == 0:
        result = yield from state3(input_val)
    else:
        result = yield from state2(input_val)


@asyncio.coroutine
def state2(transition_value):
    outputval = str("state 2 with transition value= %s \n" %
                    (transition_value))
    input_val = randint(0, 1)
    time.sleep(1)
    print("...evaluating...")
    if (input_val == 0):
        result = yield from state1(input_val)
    else:
        result = yield from state3(input_val)
    result = "state 2 calling " + result
    return outputval + str(result)


@asyncio.coroutine
def state3(transition_value):
    outputval = str("state 3 with transition value = %s \n" %
                    (transition_value))
    input_val = randint(0, 1)
    time.sleep(1)
    print("...evaluating...")
    if(input_val == 0):
        result = yield from state1(input_val)
    else:
        result = yield from state2(input_val)
    result = "state 3 calling " + result
    return outputval + str(result)


@asyncio.coroutine
def endstate(transition_value):
    outputval = str("end state with transition value = %s \n" %
                    (transition_value))
    print("...stop computation...")
    return outputval


if __name__ == "__main__":
    print("finites state machine simulation with asyncio coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(startstate())

使用 asyncio 控制任务

示例代码如下所示

import asyncio


@asyncio.coroutine
def factorial(number):
    f = 1
    for i in range(2, number + 1):
        print("asyncio.task:compute factorial(%s)" % (i))
        yield from asyncio.sleep(1)
        f *= i
    print("asyncio.task - factorial(%s) = %s" % (number, f))


@asyncio.coroutine
def fibonacci(number):
    a, b = 0, 1
    for i in range(number):
        print("asyncio.task:complete fibonacci (%s)" % (i))
        yield from asyncio.sleep(1)
        a, b = b, a+b
    print("asyncio.task - fibonaci (%s)= %s" % (number, a))


@asyncio.coroutine
def binomialcoeff(n, k):
    result = 1
    for i in range(1, k+1):
        result = result * (n-i+1) / i
        print("asyncio.task:compute binomialcoeff (%s)" % (i))
        yield from asyncio.sleep(1)
    print("asyncio.task - binomialcoeff (%s,%s) = %s" % (n, k, result))


if __name__ == "__main__":
    tasks = [asyncio.task(factorial(10)), asyncio.task(
        fibonacci(10)), asyncio.task(binomialcoeff(20, 10))]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

使用asyncio和futures

示例代码如下所示

import asyncio
import sys


@asyncio.coroutine
def first_coroutine(future, n):
    count = 0
    for i in range(1, n + 1):
        count = count + i
    yield from asyncio.sleep(4)
    future.set_result(
        "first coroutine (sum of n integers) result = " + str(count))


@asyncio.coroutine
def second_coroutine(future, n):
    count = 1
    for i in range(2, n + 1):
        count *= i
    yield from asyncio.sleep(3)
    future.set_result("second coroutine (factorial) result = " + str(count))


def got_result(future):
    print(future.result())


if __name__ == "__main__":
    n1 = 1
    n2 = 1
    loop = asyncio.get_event_loop()
    future1 = asyncio.future()
    future2 = asyncio.future()
    tasks = [
        first_coroutine(future1, n1),
        second_coroutine(future2, n2)
    ]
    future1.add_done_callback(got_result)
    future2.add_done_callback(got_result)
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

分布式编程

gpu 编程

相关参考