Tutorialspoint Python 并发编程教程
来源:易百教程
Python并发编程教程™
并发这是一种常见的自然现象,是同时发生两个或更多事件。 对于专业人员来说,创建并发应用程序并充分利用计算机硬件是一项具有挑战性的任务。
面向读者
本教程将对毕业生,研究生和研究生非常有用,他们对这个主题内容感兴趣或者将此主题作为课程的一部分。 读者可以是初学者或高级学习者。
前提条件
读者必须具备有关操作系统的并发性,多处理,多线程和进程等概念的基本知识。还应该了解OS中使用的基本术语以及Python编程概念。
问题反馈
我们不能保证您在此Python并发编程教程中不会遇到任何问题。本教程中的讲解,示例和代码等只是根据作者的理解来概括写出。由于作者水平和能力有限,因此不能保证所有的编写文章和示例均能准确无误。但是如果有遇到任何错误或问题,请反馈给我们,我们会及时纠正以方便后继读者阅读。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
Python并发简介 - Python并发编程教程™
在本章中,我们将理解Python中的并发概念,并了解线程和进程的区别。
什么是并发?
简而言之,并发是指同时发生两个或多个事件。 并发是一种自然现象,因为许多事件在任何给定的时间同时发生。
就编程而言,并发是两个任务在执行过程中重叠的时候。 通过并发编程,我们的应用程序和软件系统的性能可以得到提高,因为我们可以同时处理请求,而不是等待前一个完成再处理下一个。
并发性的历史回顾
以下几点将给我们简要的并发历史回顾 -
铁路概念
并发性与铁路概念密切相关。 有了铁路,就需要在同一条铁路系统上处理多列火车,以便每列火车都能安全到达目的地。
并行计算在学术界
对计算机科学并发性的兴趣始于1965年由Edsger W. Dijkstra发表的研究论文。在本文中,他确定并解决了互斥问题,即并发控制的性质。
高级并发基元
最近,由于引入了高级并发基元,程序员正在改进并发解决方案。
改进了编程语言的并发性
Google的Golang,Rust和Python等编程语言在帮助我们获得更好的并发解决方案的领域取得了令人难以置信的发展。
什么是线程和多线程?
线程是可以在操作系统中执行的最小执行单元。 它本身不是一个程序,而是在程序中运行。 换句话说,线程并不相互独立。 每个线程与其他线程共享代码段,数据段等。 他们也被称为轻量级流程。
一个线程由以下组件组成 -
- 程序计数器由一个可执行指令的地址组成
- 堆
- 寄存器组
- 唯一的ID
多线程,在另一方面,是CPU的通过同时执行多个线程管理使用操作系统的能力。 多线程的主要思想是通过将进程分成多个线程来实现并行性。 以下示例的帮助理解多线程的概念。
示例
假设我们正在运行一个特殊的过程,在这个过程中我们打开MS Word来输入内容。 一个线程将被分配以打开MS Word,另一个线程将需要在其中输入内容。 而现在,如果要编辑现有的文档内容,那么将需要另一个线程来执行编辑任务等等。
什么是进程和多进程?
进程被定义为一个实体,它代表了系统中要实施的基本工作单元。 简而言之,我们将计算机程序编写成文本文件,当我们执行这个程序时,它就成为执行程序中提到的所有任务的过程。 在进程生命周期中,它经历了不同的阶段 - 开始,准备,运行,等待和终止。
下图显示了一个过程的不同阶段 -
一个进程只能有一个线程,称为主线程,或者多线程拥有自己的一组寄存器,程序计数器和堆栈。 以下图表显示了它们的区别 -
另一方面,多进程是在一个计算机系统内使用两个或多个CPU单元。 我们的主要目标是充分利用我们的硬件。 为了实现这一点,我们需要利用我们的计算机系统中可用的全部CPU核心。 多多进程是最好的方法。
Python是一种最流行的编程语言。 以下是一些适合并发应用的原因 -
语法糖
语法糖是一种编程语言中的语法,旨在使事情更容易阅读或表达。 它使语言“更甜”供人类使用:事物可以更清晰,更简洁地表达,或以偏好为基础的另一种风格表达。 Python带有Magic方法,可以将其定义为对对象起作用。 这些Magic方法被用作语法糖,并绑定到更容易理解的关键字。
社区庞大
Python语言在AI,机器学习,深度学习和定量分析领域的数据科学家和数学家之间见证了大量采用率。
并发编程的API
Python 2和3有大量专用于并行/并发编程的API。 其中最受欢迎的是:threading,multiprocessing,asyncio,gevent和greenlets等。
Python在实现并发应用程序中的局限性
Python对并发应用程序有一个限制。 这种限制称为GIL(全局解释器锁),它存在于Python中。 GIL从来不允许我们利用CPU的多个内核,因此可以说Python中没有真正的线程。 我们可以理解GIL的概念如下 -
GIL(全局解释器锁)
这是Python世界中最具争议的话题之一。 在CPython中,GIL是互斥锁 - 互斥锁,它使线程安全。 换句话说,我们可以说GIL阻止了多个线程并行执行Python代码。 锁一次只能由一个线程保存,如果想执行一个线程,那么它必须先获取锁。下面的图表将帮助您了解GIL的工作。
但是,Python中有一些库和实现,如:Numpy,Jpython和IronPytbhon。 这些库与GIL没有任何交互。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
并发vs并行 - Python并发编程教程™
并发性和并行性都用于多线程程序,但它们之间的相似性和差异存在很多混淆。 在这方面的一个大问题是:并发是不是就是并行? 虽然这两个术语看起来很相似,但对上述问题的答案是否定的,并发性和并行性并不相同。 现在,如果它们不一样,它们之间的基本区别是什么?
简而言之,并发处理的是处理来自不同线程的共享状态访问,而并行处理利用多个CPU或其内核来提高硬件性能。
并发简介
并发是两个任务在执行过程中重叠的时候。 这可能是一个应用程序正在同时处理多个任务的情况。 我们可以用图解理解它; 多项任务正在同时取得进展,如下所示 -
并发级别
在本节中,我们将讨论编程方面的三个重要级别的并发性 -
1. 低级并发
在这种并发级别中,显式使用了原子操作。 我们不能在构建应用程序时使用这种并发性,因为它非常容易出错并且很难调试。 即使Python不支持这种并发性。
2. 中级并发
在这种并发中,没有使用显式的原子操作。 它使用显式锁。 Python和其他编程语言支持这种并发性。 大多数应用程序员使用这种并发性。
3. 高级并发
在这种并发中,不使用显式原子操作也不使用显式锁。 Python有concurrent.futures模块来支持这种并发。
并发系统的性质
要使程序或并发系统正确,一些属性必须由它来满足。 与终止系统相关的属性如下 -
正确性属性
正确性属性意味着程序或系统必须提供所需的正确答案。 为了简单起见,可以说系统必须正确地将启动程序状态映射到最终状态。
安全属性
安全属性意味着程序或系统必须保持“良好”或“安全”状态,并且从不做任何“坏”的事情。
活跃度属性
这个属性意味着一个程序或系统必须“取得进展”,并且会达到一个理想的状态。
并发系统的行为者
这是并发系统的一个常见属性,其中可以有多个进程和线程,它们同时运行以在他们自己的任务上取得进展。 这些进程和线程称为并发系统的角色。
并发系统的资源
行为者必须利用内存,磁盘,打印机等资源来执行任务。
某些规则集
每个并发系统必须拥有一套规则来定义执行者要执行的任务类型和每个任务的执行时间。 任务可能是获取锁,共享内存,修改状态等。
并发系统的障碍
在实现并发系统时,程序员必须考虑以下两个重要问题,这可能是并发系统的障碍 -
共享数据
实现并发系统时的一个重要问题是在多个线程或进程间共享数据。 实际上,程序员必须确保锁保护共享数据,以便所有对它的访问都被序列化,并且一次只有一个线程或进程可以访问共享数据。 如果多个线程或进程都试图访问相同的共享数据,那么除了其中至少一个以外,其他所有进程都将被阻塞并保持空闲状态。 换句话说,在锁定生效时,我们只能使用一个进程或线程。 可以有一些简单的解决方案来消除上述障碍 -
数据共享限制
最简单的解决方案是不共享任何可变数据。 在这种情况下,我们不需要使用显式锁定,并且可以解决由于相互数据而导致的并发障碍。
数据结构协助
很多时候并发进程需要同时访问相同的数据。 与使用显式锁相比,另一种解决方案是使用支持并发访问的数据结构。 例如,可以使用提供线程安全队列的队列模块。 也可以使用multiprocessing.JoinableQueue类来实现基于多处理的并发。
不可变的数据传输
有时,我们使用的数据结构(比如说并发队列)不适合,那么可以传递不可变数据而不锁定它。
可变数据传输
继续上面的解决方案,假设如果它只需要传递可变数据而不是不可变数据,那么可以传递只读的可变数据。
共享I/O资源
实现并发系统的另一个重要问题是线程或进程使用I/O资源。 当一个线程或进程使用I/O很长时间而另一线程或进程闲置时会出现问题。 在处理I/O大量应用程序时,我们可以看到这种障碍。 可以通过一个例子来理解,从Web浏览器请求页面。 这是一个沉重的应用程序。 在这里,如果数据请求的速率比它消耗的速率慢,那么在并发系统中就会有I/O障碍。
以下Python脚本用于请求网页并获取网络用于获取请求页面的时间 -
import urllib.request
import time
ts = time.time()
req = urllib.request.urlopen('http://www.yiibai.com')
pageHtml = req.read()
te = time.time()
print("Page Fetching Time : {} Seconds".format (te-ts))
执行上述脚本后,可以获取页面获取时间,如下所示。
Page Fetching Time: 0.999139881798332 Seconds
可以看到,获取该页面的时间差不多是一秒钟。 现在,如果我们想要访问数千个不同的网页,您可以大概知道访问网络需要多少时间。
什么是并行性?
并行可定义为将任务分解为可同时处理的子任务的技术。 如上所述,它与并发性相反,其中两个或更多事件同时发生。 我们可以用图解理解它; 一个任务被分解成可以并行处理的多个子任务,如下所示 -
并行但不平行
应用程序可以是并行的,但不是并行的,意味着它可以同时处理多个任务,但任务不会分解为子任务。
并行但不并发
一个应用程序可以是并行的,但不是并行的,意味着它一次只能在一个任务上工作,并且分解为子任务的任务可以并行处理。
既不平行也不并发
应用程序既不能并行也不能并发。 这意味着它一次只能处理一项任务,并且任务不会被分解为子任务。
并行和并发
应用程序既可以是并行的,也可以是并行的,这意味着它既可以同时在多个任务上工作,也可以将任务分解为子任务并行执行。
并行的必要性
我们可以通过在单CPU的不同内核之间或网络内连接的多台计算机之间分配子任务来实现并行。
考虑以下要点来理解为什么有必要实现并行性 -
有效的代码执行
借助并行性,我们可以高效地运行代码。 它将节省时间,因为部分中的相同代码并行运行。
比顺序计算更快速
顺序计算受到物理和实际因素的限制,因此无法获得更快的计算结果。 另一方面,这个问题可以通过并行计算来解决,并且比顺序计算提供更快的计算结果。
执行时间更短
并行处理减少了程序代码的执行时间。
如果要谈论真实生活中并行性的例子,我们计算机的图形卡就是一个例子,它强调了并行处理的真正能力,因为它拥有数百个独立工作的独立处理内核,并且可以同时执行。 由于这个原因,我们也能够运行高端应用程序和游戏。
理解处理器的实现
我们知道并发性,并行性以及它们之间的差异,但是它将如何实现。 理解将要实施的系统是非常必要的,因为它使我们在设计软件时能够做出明智的决定。有以下两种处理器 -
单核处理器
单核处理器能够在任何给定时间执行一个线程。 这些处理器使用上下文切换来在特定时间存储线程的所有必要信息,然后再恢复信息。 上下文切换机制有助于我们在给定秒内的多个线程上取得进展,并且看起来好像系统正在处理多种事情。
单核处理器具有许多优点。 这些处理器需要更少的功率,并且多个内核之间没有复杂的通信协议。 另一方面,单核处理器的速度有限,不适合更大的应用。
多核处理器
多核处理器具有多个独立处理单元,也称为核心。
这种处理器不需要上下文切换机制,因为每个核心都包含执行一系列存储指令所需的所有内容。
读取 - 解码 - 执行的周期
多核处理器的内核遵循一个执行周期。 这个周期被称为读取 - 解码 - 执行周期。 它涉及以下步骤 -
读取
这是循环的第一步,它涉及从程序存储器读取指令。
解码
最近读取的指令将被转换为一系列触发CPU其他部分的信号。
执行
这是获取和解码指令将被执行的最后一步。 执行结果将存储在CPU寄存器中。
这里的一个优势是多核处理器的执行速度比单核处理器的执行速度快。 它适用于更大的应用程序。 另一方面,多核之间的复杂通信协议是一个问题。 多核需要比单核处理器需要更多的功率。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
系统和内存架构 - Python并发编程教程™
在设计程序或并发系统时,需要考虑不同的系统和内存架构风格。 这是非常必要的,因为一个系统和内存风格可能适合于一项任务,但可能会出错以适应其他任务。
支持并发的计算机系统体系结构
1972年,迈克尔弗林(Michael Flynn)给出了用于分类不同风格计算机系统体系结构的分类法。 该分类法定义了以下四种不同的样式 -
- 单指令流,单数据流(SISD)
- 单指令流,多数据流(SIMD)
- 多指令流,单数据流(MISD)
- 多指令流,多数据流(MIMD)。
1. 单指令流,单数据流(SISD)
顾名思义,这种类型的系统将有一个连续的输入数据流和一个处理单元来执行数据流。 它们就像具有并行计算体系结构的单处理器系统一样。 以下是SISD的架构 -
SISD的优点
SISD架构的优点如下 -
- 它需要更少的电力。
- 多核之间没有复杂的通信协议问题。
SISD的缺点
SISD架构的缺点如下 -
- SISD架构的速度与单核处理器一样有限。
- 它不适合更大的应用程序。
2. 单指令流,多数据流(SIMD)
顾名思义,这种类型的系统会有多个输入数据流和多个处理单元,这些单元可以在任何给定时间作用于单条指令。 它们就像具有并行计算架构的多处理器系统。 以下是SIMD的架构 -
SIMD最好的例子就是显卡。显卡拥有数百个单独的处理单元。 如果要谈论SISD和SIMD之间的计算差异,那么对于添加数组[5,15,20]和[15,25,10],SISD架构将不得不执行三种不同的添加操作。 另一方面,通过SIMD体系结构,我们可以添加一个单独的添加操作。
SIMD的优点
SIMD架构的优点如下 -
- 多个元素的相同操作只能使用一条指令执行。
- 通过增加处理器的核心数量可以增加系统的吞吐量。
- 处理速度高于SISD架构。
SIMD的缺点
SIMD架构的缺点如下 -
- 处理器的核心数量之间存在复杂的通信。
- 成本高于SISD架构。
3. 多指令单数据(MISD)流
具有MISD流的系统通过在同一数据集上执行不同指令来执行不同操作的处理单元数量。 以下是MISD的架构 -
MISD体系结构尚未商业化。
4. 多指令多数据(MIMD)流
在使用MIMD体系结构的系统中,多处理器系统中的每个处理器可以并行地在不同的一组数据集上独立地执行不同的指令集。 与SIMD体系结构相反,单一操作在多个数据集上执行。 以下是MIMD的架构 -
普通的多处理器使用MIMD体系结构。 这些架构基本上应用于许多应用领域,如计算机辅助设计/计算机辅助制造,仿真,建模,通信交换机等。
内存架构支持并发
在处理并发性和并行性等概念的同时,始终需要加速这些程序。 计算机设计人员发现的一个解决方案是创建共享内存多计算机,即具有单个物理地址空间的计算机,该计算机可以被处理器拥有的所有内核访问。 在这种情况下,可以有多种不同风格的架构,但以下是三种重要的架构风格 -
UMA(统一内存访问)
在此模型中,所有处理器均匀分享物理内存。 所有处理器对所有内存字都有相同的访问时间。 每个处理器可以有一个私有缓存存储器。 外围设备遵循一套规则。
当所有的处理器对所有的外围设备有相同的访问权限时,系统被称为对称多处理器。 当只有一个或几个处理器可以访问外围设备时,系统称为非对称多处理器。
非均匀内存访问(NUMA)
在NUMA多处理器模型中,访问时间随内存字的位置而变化。 这里,共享存储器物理地分布在所有处理器中,称为本地存储器。 所有本地存储器的集合形成一个全局地址空间,所有处理器都可以访问它。
仅缓存内存体系结构(COMA)
COMA模型是NUMA模型的专用版本。 这里,所有分配的主存储器都被转换成高速缓冲存储器。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
线程 - Python并发编程教程™
一般来说,线是一种非常细的丝线,通常是棉或丝绸织物,用于缝制衣服等。 同样的术语线程也被用于计算机编程领域。 现在,我们如何将用于缝制衣服的线与用于计算机编程的线相关联? 这两个线程执行的角色类似。 在衣服中,线程将布料放在一起,在另一侧,在计算机编程中,线程持有计算机程序,并允许程序一次执行顺序操作或许多操作。
线程是操作系统中最小的执行单元。 它本身不是一个程序,而是在程序中运行。 换句话说,线程并不相互独立,并与其他线程共享代码段,数据段等。 这些线程也被称为轻量级进程。
线程状态
为了深入理解线程的功能,我们需要了解线程的生命周期或不同的线程状态。 通常,线程可以以五种不同的状态存在。 下面显示了不同的状态 -
新线程
新线程在新状态下开始其生命周期。 但是,在这个阶段,它尚未开始,也没有分配任何资源。 我们可以说它只是一个对象的实例。
可运行
当新出生的线程启动时,线程变为可运行的,即等待运行。 在这种状态下,它拥有所有资源,但仍然有任务调度程序没有安排它运行。
运行
在此状态下,线程进行并执行已由任务计划程序选择运行的任务。 现在,线程可以进入死态或不可运行/等待状态。
非运行/待机
在这种状态下,线程暂停,因为它要么等待某个I/O请求的响应,要么等待其他线程执行完成。
死锁
可运行线程在完成其任务或以其他方式终止时进入终止状态。
下图显示了线程的完整生命周期 -
线程类型
在本节中,我们将看到不同类型的线程。 类型如下所述 -
用户级线程
这些是用户管理的线程。
在这种情况下,线程管理内核不知道线程的存在。 线程库包含用于创建和销毁线程的代码,用于在线程之间传递消息和数据,用于调度线程执行以及用于保存和恢复线程上下文。 应用程序以单个线程开始。
用户级线程的例子是 -
- Java线程
- POSIX线程
用户级线程的优点
以下是用户级别线程的不同优势 -
- 线程切换不需要内核模式特权。
- 用户级线程可以在任何操作系统上运行。
- 调度可以是用户级别线程中特定于应用程序的。
- 用户级线程可以快速创建和管理。
用户级别线程的缺点
以下是用户级别线程的不同缺点 -
- 在典型的操作系统中,大多数系统调用都是阻塞的。
- 多线程应用程序无法利用多处理。
内核级线程
操作系统管理的线程在内核上运行,内核是操作系统的核心。
在这种情况下,内核执行线程管理。 应用程序区域中没有线程管理代码。 内核线程直接由操作系统支持。 任何应用程序都可以编程为多线程。 应用程序中的所有线程都在单个进程中受支持。
内核维护整个流程和流程内的各个线程的上下文信息。 内核调度是基于线程完成的。 内核在内核空间中执行线程创建,调度和管理。 内核线程创建和管理通常比用户线程要慢。 内核级线程的例子是Windows,Solaris。
内核级线程的优点
以下是内核级线程的不同优点 -
- 内核可以同时在多个进程上安排来自同一进程的多个线程。
- 如果进程中的一个线程被阻塞,则内核可以调度同一进程的另一个线程。
- 内核例程本身可以是多线程的。
内核级线程的缺点
- 内核线程创建和管理通常比用户线程要慢。
- 在同一进程内将控制从一个线程转移到另一个线程需要将模式切换到内核。
线程控制块 - TCB
线程控制块(TCB)可以定义为操作系统内核中的数据结构,主要包含有关线程的信息。 存储在TCB中的线程特定信息将突出显示有关每个进程的一些重要信息。
考虑与TCB中包含的线程相关的以下几点 -
- 线程标识 - 它是分配给每个新线程的唯一线程标识(tid)。
- 线程状态 - 它包含与线程状态(运行,可运行,不运行,死机)有关的信息。
- 程序计数器(PC) - 它指向线程的当前程序指令。
- 寄存器组 - 它包含分配给它们的线程的寄存器值用于计算。
- 堆栈指针 - 它指向进程中线程的堆栈。 它包含线程范围内的局部变量。
- 指向PCB的指针 - 它包含指向创建该线程的进程的指针。
进程和线程之间的关系
在多线程中,进程和线程是两个非常紧密相关的术语,具有相同的目标,使计算机能够一次完成多个任务。 一个进程可以包含一个或多个线程,但相反,线程不能包含进程。 但是,它们仍然是执行的两个基本单位。 一个执行一系列指令的程序启动进程,并启动两者。
下表显示了进程和线程之间的比较 -
编号 | 进程 | 线程 |
1 | 进程是重量或资源密集型的。 | 线程是轻量级的,比进程占用更少的资源。 |
2 | 进程切换需要与操作系统进行交互。 | 线程切换不需要与操作系统交互。 |
3 | 在多个处理环境中,每个进程执行相同的代码,但拥有自己的内存和文件资源。 | 所有线程都可以共享相同的一组打开文件,子进程。 |
4 | 如果一个进程被阻塞,那么在第一个进程解除阻塞之前,不能执行其他进程。 | 当一个线程被阻塞并等待时,同一个任务中的第二个线程可以运行。 |
5 | 不使用线程的多进程使用更多的资源。 | 多线程进程使用更少的资源。 |
6 | 在多个进程中,每个进程独立于其他流程运作。 | 一个线程可以读取,写入或更改另一个线程的数据。 |
7 | 如果父进程中有任何更改,则不会影响子进程。 | 如果主线程中有任何更改,则可能会影响该进程的其他线程的行为。 |
8 | 要与兄弟进程进行通信,进程必须使用进程间通信。 | 线程可以直接与该进程的其他线程通信。 |
多线程的概念
正如我们前面讨论过的,多线程是CPU通过并发执行多个线程来管理操作系统使用的能力。 多线程的主要思想是通过将进程分成多个线程来实现并行性。 以更简单的方式,多线程是通过使用线程的概念来实现多任务的方式。
以下示例的帮助可以理解多线程的概念。
示例
假设我们正在运行一个进程。 这个过程可能是为了写一些东西而打开MS词。 在这样的过程中,一个线程将被分配以打开MS字,另一个线程将被要求写入。 现在,假设想要编辑某些东西,那么将需要另一个线程来执行编辑任务等等。
下图解释内存中是否存在多个线程 -
我们可以在上面的图中看到,一个进程中可以存在多个线程,其中每个线程都包含自己的寄存器集和局部变量。 除此之外,进程中的所有线程共享全局变量。
多线程的优点
现在让我们看看多线程的一些优点。 优点如下 -
- 通信速度 - 多线程技术提高了计算速度,因为每个核心或处理器可以同时处理单独的线程。
- 程序保持响应 - 它允许程序保持响应,因为一个线程等待输入,另一个线程同时运行GUI。
- 访问全局变量 - 在多线程中,特定进程的所有线程都可以访问全局变量,如果全局变量有任何变化,那么其他线程也可以看到它。
- 资源的利用 - 在每个程序中运行多个线程可以更好地使用CPU,并且CPU的空闲时间会变少。
- 共享数据 - 每个线程不需要额外的空间,因为程序中的线程可以共享相同的数据。
多线程的缺点
现在让我们看看多线程的一些缺点。 缺点如下 -
- 不适用于单处理器系统 - 与多处理器系统的性能相比,多线程技术难以实现单处理器系统的计算速度。
- 安全性问题 - 由于程序中的所有线程共享相同的数据,因此总是存在安全问题,因为任何未知线程都可以更改数据。
- 复杂性增加 - 多线程会增加程序的复杂性,调试变得困难。
- 导致死锁状态 - 多线程会导致程序潜在的达到死锁状态的风险。
- 需要同步 - 需要同步以避免互斥。这导致更多的内存和CPU利用率。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
线程的实现 - Python并发编程教程™
在本章中,我们将学习如何在Python中实现线程。
用于线程实现的Python模块
Python线程有时称为轻量级进程,因为线程比进程占用的内存少得多。 线程允许一次执行多个任务。 在Python中,以下两个模块在一个程序中实现线程 -
- _thread 模块
- threading 模块
这两个模块之间的主要区别在于_thread模块将线程视为一个函数,而threading模块将每个线程视为一个对象并以面向对象的方式实现它。 此外,_thread模块在低级线程中有效并且比threading模块具有更少的功能。
_thread 模块
在Python的早期版本中,拥有thread模块,但在相当长的一段时间里它已被视为“已弃用”。 鼓励用户改用threading模块。 因此,在Python 3中,thread模块不再可用。 它已被重命名为_thread,用于Python3中的向后不兼容。
为了在_thread模块的帮助下生成新的线程,我们需要调用它的start_new_thread方法。 这种方法的工作可以通过以下语法来理解 -
_thread.start_new_thread ( function, args[, kwargs] )
这里,
- args是一个参数的元组
- kwargs是关键字参数的可选字典
如果想在不传递参数的情况下调用函数,那么需要在args中使用一个空的参数元组。
此方法调用立即返回,子线程启动,并调用与传递的列表(如果有的话)args的函数。 线程在函数返回时终止。
示例
以下是使用_thread模块生成新线程的示例。在这里使用start_new_thread()方法。
import _thread
import time
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print ("%s: %s" % ( threadName, time.ctime(time.time()) ))
try:
_thread.start_new_thread( print_time, ("Thread-1", 2, ) )
_thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
print ("Error: unable to start thread")
while 1:
pass
在_thread模块的帮助下理解新线程的生成。
Thread-1: Mon Apr 23 10:03:33 2018
Thread-2: Mon Apr 23 10:03:35 2018
Thread-1: Mon Apr 23 10:03:35 2018
Thread-1: Mon Apr 23 10:03:37 2018
Thread-2: Mon Apr 23 10:03:39 2018
Thread-1: Mon Apr 23 10:03:39 2018
Thread-1: Mon Apr 23 10:03:41 2018
Thread-2: Mon Apr 23 10:03:43 2018
Thread-2: Mon Apr 23 10:03:47 2018
Thread-2: Mon Apr 23 10:03:51 2018
threading模块
threading模块以面向对象的方式实现,并将每个线程视为一个对象。 因此,它为线程提供了比_thread模块更强大,更高层次的支持。该模块包含在Python 2.4中。
threading 模块中的其他方法
threading模块包含_thread模块的所有方法,但它也提供了其他方法。 其他方法如下 -
- threading.activeCount() - 此方法返回处于活动状态的线程对象的数量
- threading.currentThread() - 此方法返回调用者线程控制中的线程对象数。
- threading.enumerate() - 此方法返回当前活动的所有线程对象的列表。
为了实现线程,threading模块具有提供以下方法的Thread类 -
- run() - run()方法是线程的入口点。
- start() - start()方法通过调用run方法来启动线程。
- join([time]) - join()等待线程终止。
- isAlive() - isAlive()方法检查线程是否仍在执行。
- getName() - getName()方法返回线程的名称。
- setName() - setName()方法设置线程的名称。
如何使用 threading 模块创建线程?
在本节中,我们将学习如何使用threading模块创建线程。 按照以下步骤使用threading模块创建一个新线程 -
第1步 - 在这一步中,需要定义Thread类的新子类。
第2步 - 然后为了添加额外的参数,需要重写__init __(self [,args])方法。
第3步 - 在这一步中,需要重写run(self [,args])方法来实现线程在启动时应该执行的操作。
现在,在创建新的Thread子类后,可以创建它的一个实例,然后通过调用start()来启动一个新线程,start()又调用run()方法。
示例
下面这个例子演示如何使用threading模块生成一个新的线程。
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("Starting " + self.name)
print_time(self.name, self.counter, 5)
print ("Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("Exiting Main Thread")
Starting Thread-1
Starting Thread-2
执行上面示例代码,得到以下结果 -
Thread-1: Mon Apr 23 10:52:09 2018
Thread-1: Mon Apr 23 10:52:10 2018
Thread-2: Mon Apr 23 10:52:10 2018
Thread-1: Mon Apr 23 10:52:11 2018
Thread-1: Mon Apr 23 10:52:12 2018
Thread-2: Mon Apr 23 10:52:12 2018
Thread-1: Mon Apr 23 10:52:13 2018
Exiting Thread-1
Thread-2: Mon Apr 23 10:52:14 2018
Thread-2: Mon Apr 23 10:52:16 2018
Thread-2: Mon Apr 23 10:52:18 2018
Exiting Thread-2
Exiting Main Thread
带有线程状态的Python程序
有五种线程状态 - 新的,可运行,运行,等待和死亡。 在这五个中,我们将主要关注三个状态 - 运行,等待和死亡。 一个线程获取处于运行状态的资源,等待处于等待状态的资源; 如果执行和获取的资源的最终版本处于死亡状态。
下面的Python程序在start(),sleep()和join()方法的帮助下将分别显示线程是如何进入运行,等待和死亡状态的。
第1步 - 导入必要的模块,threading和time
import threading
import time
第2步 - 定义一个函数,它将在创建线程时调用。
def thread_states():
print("Thread entered in running state")
第3步 - 使用time模块的sleep()方法让线程等待2秒钟。
time.sleep(2)
第4步 - 现在,创建一个名为T1的线程,它接受上面定义的函数的参数。
T1 = threading.Thread(target=thread_states)
第5步 - 现在,使用start()函数,可以开始启动线程。 它会产生这个信息,这个信息是在定义函数时设定的。
T1.start()
# Thread entered in running state
第6步 - 现在,最后可以在完成执行后使用join()方法终止线程。
T1.join()
在Python中启动一个线程
在python中,可以通过不同的方式启动一个新的线程,但其中最简单的一个就是将其定义为一个单一的函数。 在定义函数之后,可以将它作为新线程的目标。线程对象等等。 执行下面的Python代码来理解函数的工作原理 -
import threading
import time
import random
def Thread_execution(i):
print("Execution of Thread {} started\n".format(i))
sleepTime = random.randint(1,4)
time.sleep(sleepTime)
print("Execution of Thread {} finished".format(i))
for i in range(4):
thread = threading.Thread(target=Thread_execution, args=(i,))
thread.start()
print("Active Threads:" , threading.enumerate())
执行上面代码,得到以下结果 -
Execution of Thread 0 started
Active Threads:
[<_MainThread(MainThread, started 6040)>,
<HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
<Thread(Thread-3576, started 3932)>]
Execution of Thread 1 started
Active Threads:
[<_MainThread(MainThread, started 6040)>,
<HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
<Thread(Thread-3576, started 3932)>,
<Thread(Thread-3577, started 3080)>]
Execution of Thread 2 started
Active Threads:
[<_MainThread(MainThread, started 6040)>,
<HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
<Thread(Thread-3576, started 3932)>,
<Thread(Thread-3577, started 3080)>,
<Thread(Thread-3578, started 2268)>]
Execution of Thread 3 started
Active Threads:
[<_MainThread(MainThread, started 6040)>,
<HistorySavingThread(IPythonHistorySavingThread, started 5968)>,
<Thread(Thread-3576, started 3932)>,
<Thread(Thread-3577, started 3080)>,
<Thread(Thread-3578, started 2268)>,
<Thread(Thread-3579, started 4520)>]
Execution of Thread 0 finished
Execution of Thread 1 finished
Execution of Thread 2 finished
Execution of Thread 3 finished
在Python中启动一个线程
在python中,可以通过不同的方式启动一个新的线程,但最简单的就是将其定义为一个单一的函数。 在定义函数之后,可以将它作为新线程的目标。线程对象等等。 执行下面的Python代码来理解函数的工作原理 -
import threading
import time
def nondaemonThread():
print("starting my thread")
time.sleep(8)
print("ending my thread")
def daemonThread():
while True:
print("Hello")
time.sleep(2)
if __name__ == '__main__':
nondaemonThread = threading.Thread(target = nondaemonThread)
daemonThread = threading.Thread(target = daemonThread)
daemonThread.setDaemon(True)
daemonThread.start()
nondaemonThread.start()
在上面的代码中,有两个函数,分别是- nondaemonThread()和daemonThread()。 第一个函数打印其状态并在8秒后休眠,而deamonThread()函数每2秒无限期地打印出Hello。 我们可以通过以下输出来了解nondaemon和daemon线程之间的区别 -
Hello
starting my thread
Hello
Hello
Hello
Hello
ending my thread
Hello
Hello
Hello
Hello
Hello
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
同步线程 - Python并发编程教程™
线程同步可以定义为一种方法,借助这种方法,可以确信两个或更多的并发线程不会同时访问被称为临界区的程序段。 另一方面,正如我们所知道的那样,临界区是共享资源被访问的程序的一部分。 因此,同步是通过同时访问资源来确保两个或更多线程不相互连接的过程。 下图显示了四个线程同时尝试访问程序的临界区。
为了使它更清楚,假设有两个或更多线程试图同时在列表中添加对象。 这种行为不能导致成功的结局,因为它会抛弃一个或所有的对象,或者它会完全破坏列表的状态。 这里同步的作用是每次只有一个线程可以访问列表。
线程同步的问题
在实现并发编程或应用同步基元时,可能会遇到问题。 在本节中,我们将讨论两个主要问题。 问题是 -
- 死锁
- 竞争条件
1. 竞争条件
这是并发编程的主要问题之一。 对共享资源的并发访问可能会导致竞争状态。 竞争条件可以定义为当两个或更多线程可以访问共享数据,然后尝试同时更改其值时发生的条件。 由此,变量的值可能是不可预知的,并且取决于进程的上下文切换的时间而变化。
示例
考虑这个例子来理解竞争条件的概念 -
第1步 - 在这一步中,需要导入线程模块 -
import threading
第2步 - 现在,定义一个全局变量,例如x,以及其值为0 -
x = 0
第3步 - 现在,需要定义increment_global()函数,该函数将在此全局函数中执行x递增1 -
def increment_global():
global x
x += 1
第4步 - 在这一步中,将定义taskofThread()函数,它将调用increment_global()函数达指定的次数; 在这个例子中,它是50000次 -
def taskofThread():
for _ in range(50000):
increment_global()
第5步 - 现在,定义创建线程t1和t2的main()函数。 两者都将在start()函数的帮助下启动,并等待join()函数完成作业。
def main():
global x
x = 0
t1 = threading.Thread(target= taskofThread)
t2 = threading.Thread(target= taskofThread)
t1.start()
t2.start()
t1.join()
t2.join()
第6步 - 现在,需要给出范围,如想要调用main()函数的迭代次数。 在这里,调用为5次。
if __name__ == "__main__":
for i in range(5):
main()
print("x = {1} after Iteration {0}".format(i,x))
在下面显示的输出中,我们可以看到竞争条件的影响,因为在每次迭代之后x的值预计为100000。但是,值有很大的变化。 这是由于线程对共享全局变量x的并发访问。
x = 100000 after Iteration 0
x = 54034 after Iteration 1
x = 80230 after Iteration 2
x = 93602 after Iteration 3
x = 93289 after Iteration 4
处理使用锁定的竞争条件
正如我们已经看到上述程序中竞争条件的影响,我们需要一个同步工具,它可以处理多个线程之间的竞争条件。 在Python中,threading模块提供了Lock类来处理竞争条件。 此外,Lock类提供了不同的方法,可以通过它帮助处理多个线程之间的竞争条件。 方法如下所述 -
acquire()方法
该方法用于获取,即阻止锁定。 锁可以是阻塞或非阻塞取决于以下真或假的值 -
- 将值设置为True - 如果使用默认参数True调用acquire()方法,则线程执行将被阻止,直到解锁锁。
- 将值设置为False - 如果acquire()方法使用False调用,而False不是默认参数,那么线程执行不会被阻塞,直到它被设置为true,即直到它被锁定。
release()方法
此方法用于释放锁。 以下是与此方法相关的一些重要任务 -
- 如果锁定被锁定,那么release()方法将解锁它。 它的工作是如果多个线程被阻塞并且等待锁被解锁,则只允许一个线程继续。
- 如果锁已经解锁,它将引发一个ThreadError错误。
现在,我们可以用锁类及其方法重写上述程序,以避免竞争条件。 我们需要使用lock参数定义taskofThread()方法,然后使用acquire()和release()方法来阻塞和非阻塞锁以避免竞争状况。
示例
以下是用于理解处理竞争条件的锁概念的python程序示例 -
import threading
x = 0
def increment_global():
global x
x += 1
def taskofThread(lock):
for _ in range(50000):
lock.acquire()
increment_global()
lock.release()
def main():
global x
x = 0
lock = threading.Lock()
t1 = threading.Thread(target = taskofThread, args = (lock,))
t2 = threading.Thread(target = taskofThread, args = (lock,))
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == "__main__":
for i in range(5):
main()
print("x = {1} after Iteration {0}".format(i,x))
以下输出显示竞争条件的影响被忽略; 在每次迭代之后,x的值现在是100000,这与该程序的期望值相同。
x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4
僵局 - 餐饮哲学家的问题
死锁是设计并发系统时可能遇到的麻烦问题。可以在餐饮哲学家问题的帮助下说明这个问题,如下所示 -
Edsger Dijkstra最初介绍了餐饮哲学家问题,这是着名的并发系统最大问题和死锁问题之一。
在这个问题中,有五位着名的哲学家坐在圆桌旁,从碗里吃着一些食物。 五种哲学家可以使用五种叉子来吃他们的食物。 然而,哲学家决定同时使用两把叉子来吃他们的食物。
现在,哲学家有两个主要条件。 首先,每个哲学家既可以进食也可以处于思维状态,其次,他们必须首先获得叉子,即左边和右边的叉子。 当五位哲学家中的每一位设法同时选择左叉时,问题就出现了。他们都在等待右叉是自由的,但他们在未吃完了食物之前永远不会放弃他们的叉子,并且永远不会有右叉。 因此,餐桌上会出现僵局。
并发系统中的死锁
现在如果我们看到,并发系统也会出现同样的问题。 上面例子中的叉子是系统资源,每个哲学家都可以表示这个竞争获取资源的过程。
Python程序的解决方案
通过将哲学家分为两种类型 - 贪婪的哲学家和慷慨的哲学家,可以找到解决这个问题的方法。 主要是一个贪婪的哲学家会尝试拿起左边的叉子,等到左边的叉出现。 然后,他会等待右叉子在那里,拿起来,吃,然后把它放下。 另一方面,一个慷慨的哲学家会尝试拿起左边的叉子,如果它不在那里,他会等一段时间再试一次。 如果他们拿到左边的叉子,他们会尝试找到右叉子。 如果他们也会得到正确的叉子,那么他们会吃饭并释放叉子。 但是,如果他们不能得到右叉子,那么他们也会释放左叉子。
示例
以下Python程序将帮助找到解决哲学家就餐问题的方案 -
import threading
import random
import time
class DiningPhilosopher(threading.Thread):
running = True
def __init__(self, xname, Leftfork, Rightfork):
threading.Thread.__init__(self)
self.name = xname
self.Leftfork = Leftfork
self.Rightfork = Rightfork
def run(self):
while(self.running):
time.sleep( random.uniform(3,13))
print ('%s is hungry.' % self.name)
self.dine()
def dine(self):
fork1, fork2 = self.Leftfork, self.Rightfork
while self.running:
fork1.acquire(True)
locked = fork2.acquire(False)
if locked: break
fork1.release()
print ('%s swaps forks' % self.name)
fork1, fork2 = fork2, fork1
else:
return
self.dining()
fork2.release()
fork1.release()
def dining(self):
print ('%s starts eating '% self.name)
time.sleep(random.uniform(1,10))
print ('%s finishes eating and now thinking.' % self.name)
def Dining_Philosophers():
forks = [threading.Lock() for n in range(5)]
philosopherNames = ('1st','2nd','3rd','4th', '5th')
philosophers= [DiningPhilosopher(philosopherNames[i], forks[i%5], forks[(i+1)%5]) \
for i in range(5)]
random.seed()
DiningPhilosopher.running = True
for p in philosophers: p.start()
time.sleep(30)
DiningPhilosopher.running = False
print (" It is finishing.")
Dining_Philosophers()
上面的程序使用了贪婪和慷慨的哲学家的概念。 该程序还使用了threading模块的Lock类的acquire()和release()方法。 我们可以在下面的输出中看到解决方案 -
4th is hungry.
4th starts eating
1st is hungry.
1st starts eating
2nd is hungry.
5th is hungry.
3rd is hungry.
1st finishes eating and now thinking.3rd swaps forks
2nd starts eating
4th finishes eating and now thinking.
3rd swaps forks5th starts eating
5th finishes eating and now thinking.
4th is hungry.
4th starts eating
2nd finishes eating and now thinking.
3rd swaps forks
1st is hungry.
1st starts eating
4th finishes eating and now thinking.
3rd starts eating
5th is hungry.
5th swaps forks
1st finishes eating and now thinking.
5th starts eating
2nd is hungry.
2nd swaps forks
4th is hungry.
5th finishes eating and now thinking.
3rd finishes eating and now thinking.
2nd starts eating 4th starts eating
It is finishing.
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
线程通信 - Python并发编程教程™
在现实生活中,如果一个人团队正在共同完成任务,那么他们之间应该有通信,以便正确完成任务。 同样的比喻也适用于线程。 在编程中,要减少处理器的理想时间,我们创建了多个线程,并为每个线程分配不同的子任务。 因此,必须有一个通信设施,他们应该互相沟通交流,以同步的方式完成工作。
考虑以下与线程通信相关的重要问题 -
- 没有性能增益 - 如果无法在线程和进程之间实现适当的通信,那么并发性和并行性的性能收益是没有用的。
- 完成任务 - 如果线程之间没有适当的相互通信机制,分配的任务将无法正常完成。
- 比进程间通信更高效 - 线程间通信比进程间通信更高效且更易于使用,因为进程内的所有线程共享相同的地址空间,并且不需要使用共享内存。
线程安全通信的Python数据结构
多线程代码出现了将信息从一个线程传递到另一个线程的问题。 标准的通信原语不能解决这个问题。 因此,需要实现我们自己的组合对象,以便在线程之间共享对象以使通信线程安全。 以下是一些数据结构,它们在进行一些更改后提供线程安全通信 -
1. Set
为了以线程安全的方式使用set数据结构,需要扩展set类来实现我们自己的锁定机制。
以下是一个扩展类的Python示例 -
class extend_class(set):
def __init__(self, *args, **kwargs):
self._lock = Lock()
super(extend_class, self).__init__(*args, **kwargs)
def add(self, elem):
self._lock.acquire()
try:
super(extend_class, self).add(elem)
finally:
self._lock.release()
def delete(self, elem):
self._lock.acquire()
try:
super(extend_class, self).delete(elem)
finally:
self._lock.release()
在上面的例子中,定义了一个名为extend_class的类对象,它继承自Python集合类。 在这个类的构造函数中创建一个锁对象。 现在有两个函数 - add()和delete()。 这些函数被定义并且是线程安全的。 它们都依赖于超类功能以及一个键异常。
修饰器
这是线程安全通信的另一个关键方法是使用装饰器。
示例
考虑一个Python示例,展示如何使用装饰器和mminus;
def lock_decorator(method):
def new_deco_method(self, *args, **kwargs):
with self._lock:
return method(self, *args, **kwargs)
return new_deco_method
class Decorator_class(set):
def __init__(self, *args, **kwargs):
self._lock = Lock()
super(Decorator_class, self).__init__(*args, **kwargs)
@lock_decorator
def add(self, *args, **kwargs):
return super(Decorator_class, self).add(elem)
@lock_decorator
def delete(self, *args, **kwargs):
return super(Decorator_class, self).delete(elem)
在上面的例子中,已经定义了一个名为lock_decorator的装饰器方法,该方法从Python方法类继承。 然后在这个类的构造函数中创建一个锁对象。 现在有两个函数 - add()和delete()。 这些函数被定义并且是线程安全的。 他们都依靠超类功能和一个键异常。
2. list
列表数据结构对于临时内存存储而言是线程安全,快速以及简单的结构。在Cpython中,GIL可以防止对它们的并发访问。当我们知道列表是线程安全的,但是数据在哪里呢。实际上,该列表的数据不受保护。例如,如果另一个线程试图做同样的事情,则L.append(x)不保证能够返回预期的结果。这是因为尽管append()是一个原子操作并且是线程安全的,但另一个线程试图以并发方式修改列表数据,因此可以看到竞争条件对输出的副作用。
为了解决这类问题并安全地修改数据,我们必须实现一个适当的锁定机制,这进一步确保多个线程不会潜在竞争条件。为了实现适当的锁定机制,可以像前面的例子那样扩展这个类。
列表上的其他一些原子操作如下所示 -
L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()
这里 -
- L,L1,L2都是列表
- D,D1,D2是字典
- x,y是对象
- i,j是整数
3. 队列
如果清单数据不受保护,我们可能不得不面对后果。 可能会得到或删除错误的数据项,竞争条件。 这就是为什么建议使用队列数据结构的原因。 一个真实世界的排队示例可以是单车道单向道路,车辆首先进入,首先退出。 售票窗口和公共汽车站的队列中可以看到更多真实世界的例子。
队列是默认的线程安全数据结构,我们不必担心实现复杂的锁定机制。 Python提供了应用程序中使用不同类型队列的模块。
队列类型
在本节中,我们将获得关于不同类型的队列的信息。 Python提供了三种从queue模块使用的队列选项 -
- 正常队列(FIFO,先进先出)
- 后进先出,后进先出
- 优先级
我们将在随后的章节中了解不同的队列。
正常队列(FIFO,先进先出)
它是Python提供的最常用的队列实现。 在这种先排队的机制中,首先得到服务。 FIFO也被称为正常队列。 FIFO队列可以表示如下 -
FIFO队列的Python实现
在python中,FIFO队列可以用单线程和多线程来实现。
具有单线程的FIFO队列
要实现单线程的FIFO队列,Queue类将实现一个基本的先进先出容器。 使用put()将元素添加到序列的一个“结尾”,并使用get()从另一端移除元素。
示例
以下是用单线程实现FIFO队列的Python程序 -
import queue
q = queue.Queue()
for i in range(8):
q.put("item-" + str(i))
while not q.empty():
print (q.get(), end = " ")
执行上面示例代码,得到以下结果 -
item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7
输出结果显示上面的程序使用单个线程来说明这些元素将按照它们插入的顺序从队列中移除。
具有多个线程的FIFO队列
为了实现多线程的FIFO,需要从queue模块扩展来定义myqueue()函数。 get()和put()方法的工作方式与上面讨论的一样,只用单线程实现FIFO队列。 然后为了使它成为多线程,我们需要声明和实例化线程。 这些线程将以FIFO方式使用队列。
示例
以下是用于实现具有多个线程的FIFO队列的Python程序 -
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(2)
q = queue.Queue()
for i in range(5):
q.put(i)
threads = []
for i in range(4):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
执行上面示例代码,得到以下结果 -
<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue
4. LIFO,后进先出队列
队列使用与FIFO(先进先出)队列完全相反的类比。 在这个队列机制中,最后一个将首先获得服务。 这与实现堆栈数据结构相似。 LIFO队列在实施深度优先搜索时非常有用,如人工智能算法。
LIFO队列的Python实现
在python中,LIFO队列可以用单线程和多线程来实现。
单线程的LIFO队列
要用单线程实现LIFO队列,Queue类将使用结构Queue.LifoQueue来实现基本的后进先出容器。 现在,在调用put()时,将元素添加到容器的头部,并使用get()从头部移除。
示例
以下是用单线程实现LIFO队列的Python程序 -
import queue
q = queue.LifoQueue()
for i in range(8):
q.put("item-" + str(i))
while not q.empty():
print (q.get(), end=" ")
执行上面示例代码,得到以下结果 -
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0
输出显示上述程序使用单个线程来说明元素将以插入的相反顺序从队列中移除。
带有多个线程的LIFO队列
这个实现与使用多线程实现FIFO队列相似。 唯一的区别是需要使用Queue类,该类将使用结构Queue.LifoQueue来实现基本的后进先出容器。
示例
以下是用于实现具有多个线程的LIFO队列的Python程序 -
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
q.put(i)
threads = []
for i in range(4):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
执行上面示例代码,得到以下结果 -
<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue
优先队列
在FIFO和LIFO队列中,项目顺序与插入顺序有关。 但是,有很多情况下优先级比插入顺序更重要。 让我们考虑一个真实世界的例子。 假设机场的安保人员正在检查不同类别的人员。 VVIP的人员,航空公司工作人员,海关人员,类别可能会优先检查,而不是像到平民那样根据到达情况进行检查。
需要考虑优先队列的另一个重要方面是如何开发任务调度器。 一种常见的设计是在队列中优先处理最具代理性的任务。 该数据结构可用于根据队列的优先级值从队列中提取项目。
优先队列的Python实现
在python中,优先级队列可以用单线程和多线程来实现。
单线程优先队列
要实现单线程优先队列,Queue类将使用结构Queue.PriorityQueue在优先级容器上实现任务。 现在,在调用put()时,元素将添加一个值,其中最低值将具有最高优先级,并因此使用get()首先检索。
示例
考虑下面的Python程序来实现单线程的优先级队列 -
import queue as Q
p_queue = Q.PriorityQueue()
p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))
while not p_queue.empty():
item = p_queue.get()
print('%s - %s' % item)
执行上面示例代码,得到以下结果 -
1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important
在上面的输出中,可以看到队列已经存储了基于优先级的项目 - 较少的值具有高优先级。
具有多线程的优先队列
该实现类似于具有多个线程的FIFO和LIFO队列的实现。 唯一的区别是需要使用Queue类通过使用结构Queue.PriorityQueue来初始化优先级。 另一个区别是队列的生成方式。 在下面给出的例子中,它将生成两个相同的数据集。
示例
以下Python程序有助于实现具有多个线程的优先级队列 -
import threading
import queue
import random
import time
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
q.put(i,1)
for i in range(5):
q.put(i,1)
threads = []
for i in range(2):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
执行上面示例代码,得到以下结果 -
<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
测试线程应用程序 - Python并发编程教程™
在本章中,我们将学习线程应用程序的测试。也将学习测试的重要性。
为什么要测试?
在我们深入讨论测试的重要性之前,需要知道什么是测试。 一般来说,测试是一种发现某件事情如何发挥作用的技术。 另一方面,特别是如果谈论计算机程序或软件,那么测试就是访问软件程序功能的技术。
在本节中,我们将讨论软件测试的重要性。 在软件开发中,在向客户端发布软件之前必须进行双重检查。 这就是为什么经验丰富的测试团队测试软件非常重要。 从以下几点来理解软件测试的重要性 -
1. 提高软件质量
当然,没有公司想要提供低质量的软件,也没有客户想要购买低质量的软件。 测试通过查找并修复其中的错误来提高软件的质量。
2. 客户满意度
任何企业最重要的部分是客户的满意度。 通过提供无错误和高质量的软件,公司可以实现客户满意度。
3. 减少新功能的影响
假设我们已经制作了10000行的软件系统,并且我们需要添加一个新功能,那么开发团队就会担心这个新功能对整个软件的影响。 在这里,测试也起着至关重要的作用,因为如果测试团队已经完成了一套很好的测试,那么它可以帮助我们避免任何潜在的灾难性休息。
4. 用户体验
任何业务中另一个最重要的部分是该产品用户的体验。 只有测试才能确保最终用户发现使用该产品简单易用。
5. 削减开支
测试可以通过在测试开发的测试阶段找到并修复错误而不是在交付后修复软件来降低软件的总成本。 如果在交付软件后出现重大缺陷,那么就费用和无形成本而言,例如客户不满意度,公司负面声誉等方面,它会增加其有形成本。
要测试什么?
总是建议对要测试的内容有适当的知识。 在本节中,我们将首先了解测试任何软件时测试人员的主要动机。 应该避免使用代码覆盖率,即测试套件在测试时碰到多少行代码。 这是因为,在测试时,只关注代码行数量并不会增加系统的实际价值。 可能存在一些错误,即使在部署之后,稍后也会反映出来。
考虑以下与测试内容相关的重要问题 -
- 需要关注测试代码的功能而不是代码覆盖。
- 需要首先测试代码中最重要的部分,然后转向代码中不太重要的部分。这肯定会节省时间。
- 测试仪必须有多种不同的测试,可以将软件推到极限。
测试并行软件程序的方法
由于利用多核架构的真实能力,并行软件系统正在取代顺序系统。 最近,从手机到洗衣机,从汽车到飞机等,所有的并发系统程序都在使用。需要更加小心地测试并发软件程序,因为如果为单线程应用程序添加了多个线程, 已经是一个错误,那么最终会遇到多个错误。
并发软件程序的测试技术主要集中在选择交错方面,这些交错方式暴露了潜在的有害模式,如竞态条件,死锁和原子性违规。 以下是测试并发软件程序的两种方法 -
系统的探索
这种方法旨在尽可能广泛地探索交织的空间。 这些方法可以采用强力技术,而其他方法则采用部分降阶技术或启发式技术来探索交织的空间。
属性驱动
属性驱动方法依赖于观察到并发错误更有可能发生在交错之下,这些交错揭示了诸如可疑内存访问模式之类的特定属性。 不同的财产驱动方法针对不同的故障,如竞态条件,死锁和违反原子性,这进一步取决于一个或其他特定属性。
测试策略
测试策略也被称为测试方法。 该策略定义了如何进行测试。 测试方法有两种技术 -
主动
尽早开始测试设计过程以便在创建构建之前查找并修复缺陷的方法。
反应
直到完成开发过程才开始测试的方法。
在对python程序应用任何测试策略或方法之前,我们必须对软件程序可能存在的错误类型有一个基本的想法。 错误如下 -
语法错误
在程序开发过程中,可能会出现很多小错误。 错误主要是由于输入错误。 例如,缺少冒号或关键字的拼写错误等。这些错误是由于程序语法中的错误,而不是逻辑中的错误。 因此,这些错误被称为语法错误。
语义错误
语义错误也被称为逻辑错误。 如果软件程序中存在逻辑或语义错误,则该语句将编译并正确运行,但由于逻辑不正确,它不会给出所需的输出。
单元测试
这是测试python程序最常用的测试策略之一。 该策略用于测试代码的单元或组件。 我们指单位或组件代表代码的类别或功能。 单元测试通过测试“小”单元来简化大型编程系统的测试。 在上述概念的帮助下,单元测试可以被定义为一种方法,其中对源代码的各个单元进行测试以确定它们是否返回期望的输出。
在接下来的章节中,我们将学习单元测试的不同Python模块。
unittest模块
单元测试的第一个模块是unittest模块。 它受JUnit的启发,默认包含在Python3.6中。 它支持测试自动化,共享测试的设置和关闭代码,将测试集合到集合中,以及测试独立于报告框架。
以下是unittest模块支持的一些重要概念
文本夹具
它用于设置测试,以便在测试结束后可以在开始测试和拆卸之前运行测试。 它可能涉及在开始测试之前创建临时数据库,目录等。
测试用例
测试用例检查所需的响应是否来自特定的一组输入。 unittest模块包含一个名为TestCase的基类,可用于创建新的测试用例。 它包括两个默认方法 -
- setUp() - 在锻炼之前设置测试夹具的钩子方法。 这在调用实现的测试方法之前调用。
- tearDown() - 在类中运行所有测试之后解构类装置的钩子方法。
测试套件
它是测试套件,测试用例或两者的集合。
测试运行
它控制测试用例或套装的运行并向用户提供结果。 它可以使用GUI或简单的文本界面来提供结果。
以下Python程序使用unittest模块来测试名为Fibonacci的模块。 该计划有助于计算一个数字的斐波那契数列。 在这个例子中,我们创建了一个名为Fibo_test类,通过使用不同的方法来定义测试用例。 这些方法从unittest.TestCase继承。 我们使用两个默认方法 - setUp()和tearDown()。 也定义了testfibocal方法。 测试名称必须以字母测试开始。 在最后的块中,unittest.main()为测试脚本提供了一个命令行入口。
import unittest
def fibonacci(n):
a, b = 0, 1
for i in range(n):
a, b = b, a + b
return a
class Fibo_Test(unittest.TestCase):
def setUp(self):
print("This is run before our tests would be executed")
def tearDown(self):
print("This is run after the completion of execution of our tests")
def testfibocal(self):
self.assertEqual(fib(0), 0)
self.assertEqual(fib(1), 1)
self.assertEqual(fib(5), 5)
self.assertEqual(fib(10), 55)
self.assertEqual(fib(20), 6765)
if __name__ == "__main__":
unittest.main()
当从命令行运行时,上面的脚本产生一个看起来像这样的输出 -
This runs before our tests would be executed.
This runs after the completion of execution of our tests.
.
----------------------------------------------------------------------
Ran 1 test in 0.006s
OK
现在,为了更清楚一点,修改上面的代码,这有助于定义斐波那契模块。
以下面的代码块为例 -
def fibonacci(n):
a, b = 0, 1
for i in range(n):
a, b = b, a + b
return a
代码块的一些更改如下所示 -
def fibonacci(n):
a, b = 1, 1
for i in range(n):
a, b = b, a + b
return a
现在,使用更改后的代码运行脚本后,将得到以下输出 -
This runs before our tests would be executed.
This runs after the completion of execution of our tests.
F
======================================================================
FAIL: testCalculation (__main__.Fibo_Test)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unitg.py", line 15, in testCalculation
self.assertEqual(fib(0), 0)
AssertionError: 1 != 0
----------------------------------------------------------------------
Ran 1 test in 0.007s
FAILED (failures = 1)
以上输出显示模块未能提供所需的输出。
Docktest模块
docktest模块也有助于单元测试。 它也预装python。 它比单元测试模块更容易使用。 unittest模块更适合于复杂的测试。要使用doctest模块,可以直接导入它。 相应函数的文档字符串必须具有交互式python会话及其输出。
如果代码中一切正常,那么docktest模块将不会有输出; 否则,它会提供输出。
示例
以下Python示例使用docktest模块来测试名为Fibonacci的模块,该模块有助于计算数字的斐波那契数列。
import doctest
def fibonacci(n):
"""
Calculates the Fibonacci number
>>> fibonacci(0)
0
>>> fibonacci(1)
1
>>> fibonacci(10)
55
>>> fibonacci(20)
6765
>>>
"""
a, b = 1, 1
for i in range(n):
a, b = b, a + b
return a
if __name__ == "__main__":
doctest.testmod()
可以看到名为fib的相应函数的文档字符串具有交互式python会话和输出。 如果代码没有问题,那么doctest模块将不会有输出。 但要了解它的工作原理,我们可以使用-v选项运行它。
(base) D:\ProgramData>python dock_test.py -v
Trying:
fibonacci(0)
Expecting:
0
ok
Trying:
fibonacci(1)
Expecting:
1
ok
Trying:
fibonacci(10)
Expecting:
55
ok
Trying:
fibonacci(20)
Expecting:
6765
ok
1 items had no tests:
__main__
1 items passed all tests:
4 tests in __main__.fibonacci
4 tests in 2 items.
4 passed and 0 failed.
Test passed.
现在,我们将更改帮助定义斐波那契模块的代码,以下面的代码块为例 -
def fibonacci(n):
a, b = 0, 1
for i in range(n):
a, b = b, a + b
return a
以下代码块有助于更改 -
def fibonacci(n):
a, b = 1, 1
for i in range(n):
a, b = b, a + b
return a
在运行脚本后,即使没有-v选项,使用更改后的代码,我们将得到如下所示的输出。
(base) D:\ProgramData>python dock_test.py
**********************************************************************
File "unitg.py", line 6, in __main__.fibonacci
Failed example:
fibonacci(0)
Expected:
0
Got:
1
**********************************************************************
File "unitg.py", line 10, in __main__.fibonacci
Failed example:
fibonacci(10)
Expected:
55
Got:
89
**********************************************************************
File "unitg.py", line 12, in __main__.fibonacci
Failed example:
fibonacci(20)
Expected:
6765
Got:
10946
**********************************************************************
1 items had failures:
3 of 4 in __main__.fibonacci
***Test Failed*** 3 failures.
我们可以在上面的输出中看到三个测试失败了。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
调试线程应用程序 - Python并发编程教程™
在本章中,我们将学习如何调试线程应用程序。也将学习调试的重要性。
调试是什么?
在计算机编程中,调试是查找和消除计算机程序中的错误,错误和异常的过程。 这个过程在代码被写入后立即开始,并且随着代码与其他编程单元组合以形成软件产品而连续进行。 调试是软件测试过程的一部分,是整个软件开发生命周期的一个组成部分。
Python调试器
Python调试器(也叫pdb)是Python标准库的一部分。 这是一个很好的回退工具,用于追踪难以发现的错误,并允许我们快速可靠地修复错误的代码。 以下是 pdp 调试器的两个最重要的任务 -
- 它允许在运行时检查变量的值。
- 可以遍历代码并设置断点。
我们可以通过以下两种方式使用pdb:
- 通过命令行; 这也被称为事后调试。
- 通过交互式运行pdb。
使用pdb
要使用Python调试器,在想要进入调试器的位置使用以下代码 -
import pdb;
pdb.set_trace()
考虑以下命令通过命令行使用pdb。
- h(help)
- d(down)
- u(up)
- b(break)
- cl(clear)
- l(list))
- n(next))
- c(continue)
- s(step)
- r(return))
- b(break)
以下是Python调试器的h(帮助)命令演示 -
import pdb
pdb.set_trace()
--Call--
>d:\programdata\lib\site-packages\ipython\core\displayhook.py(247)__call__()
-> def __call__(self, result = None):
(Pdb) h
Documented commands (type help <topic>):
========================================
EOF c d h list q rv undisplay
a cl debug help ll quit s unt
alias clear disable ignore longlist r source until
args commands display interact n restart step up
b condition down j next return tbreak w
break cont enable jump p retval u whatis
bt continue exit l pp run unalias where
Miscellaneous help topics:
==========================
exec pdb
示例
在使用Python调试器时,我们可以通过使用以下几行来在脚本中的任何位置设置断点 -
import pdb;
pdb.set_trace()
设置断点后,我们可以正常运行脚本。 该脚本将执行到某个断点; 直到设置了一行。 考虑以下示例,我们将通过在脚本中的各个位置使用上述行来运行脚本 -
import pdb;
a = "aaa"
pdb.set_trace()
b = "bbb"
c = "ccc"
final = a + b + c
print (final)
当上面的脚本运行时,它会执行程序直到a = "aaa",可以在下面的输出中查看它。
--Return--
> <ipython-input-7-8a7d1b5cc854>(3)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
*** NameError: name 'b' is not defined
(Pdb) p c
*** NameError: name 'c' is not defined
在pdb中使用命令'p(print)'后,此脚本仅打印'aaa'。 随后出现错误,因为我们已将断点设置为a ="aaa"。
同样,我们可以通过更改断点并查看输出中的差异来运行脚本 -
import pdb
a = "aaa"
b = "bbb"
c = "ccc"
pdb.set_trace()
final = a + b + c
print (final)
执行上面示例代码,得到以下结果 -
--Return--
> <ipython-input-9-a59ef5caf723>(5)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
'bbb'
(Pdb) p c
'ccc'
(Pdb) p final
*** NameError: name 'final' is not defined
(Pdb) exit
在下面的脚本中,在程序的最后一行设置断点 -
import pdb
a = "aaa"
b = "bbb"
c = "ccc"
final = a + b + c
pdb.set_trace()
print (final)
执行上面代码,输出结果如下 -
--Return--
> <ipython-input-11-8019b029997d>(6)<module>()->None
-> pdb.set_trace()
(Pdb) p a
'aaa'
(Pdb) p b
'bbb'
(Pdb) p c
'ccc'
(Pdb) p final
'aaabbbccc'
(Pdb)
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
基准和性能分析 - Python并发编程教程™
在本章中,我们将学习基准测试和分析如何帮助解决性能问题。
假设我们已经编写了一个代码,并且它也给出了期望的结果,但是如果想要更快地运行此代码,因为需求已经发生了变化。 在这种情况下,需要找出代码的哪些部分正在减慢整个程序。 在这种情况下,基准测试和分析可能很有用。
基准测试是什么?
基准测试旨在通过与标准进行比较来评估某些事物。 然而,这里出现的问题是,什么是基准,以及为什么需要软件编程。 对代码进行基准测试意味着代码的执行速度以及瓶颈的位置。 基准测试的一个主要原因是它优化了代码。
基准是如何工作?
如果我们谈论基准测试的工作,需要首先将整个程序作为一个当前状态,然后可以将微基准结合起来,然后将程序分解成更小的程序。 找到程序中的瓶颈并优化它。 换句话说,我们可以把它理解为将大而难的问题分解为一系列较小和较容易的问题来优化它们。
Python模块进行基准测试
在Python中,我们有一个默认的基准测试模块,称为timeit。 在timeit模块的帮助下,我们可以在主程序中测量一小段Python代码的性能。
示例
在下面的Python脚本中,导入了timeit模块,它进一步测量执行两个函数所需的时间 - functionA和functionB -
import timeit
import time
def functionA():
print("Function A starts the execution:")
print("Function A completes the execution:")
def functionB():
print("Function B starts the execution")
print("Function B completes the execution")
start_time = timeit.default_timer()
functionA()
print(timeit.default_timer() - start_time)
start_time = timeit.default_timer()
functionB()
print(timeit.default_timer() - start_time)
运行上面的脚本之后,将得到两个函数的执行用时,如下所示。
Function A starts the execution:
Function A completes the execution:
0.0014599495514175942
Function B starts the execution
Function B completes the execution
0.0017024724827479076
使用装饰器函数编写计时器
在Python中,我们可以创建自己的计时器,它的行为就像timeit模块一样。 它可以在装饰器功能的帮助下完成。 以下是自定义计时器的示例 -
import random
import time
def timer_func(func):
def function_timer(*args, **kwargs):
start = time.time()
value = func(*args, **kwargs)
end = time.time()
runtime = end - start
msg = "{func} took {time} seconds to complete its execution."
print(msg.format(func = func.__name__,time = runtime))
return value
return function_timer
@timer_func
def Myfunction():
for x in range(5):
sleep_time = random.choice(range(1,3))
time.sleep(sleep_time)
if __name__ == '__main__':
Myfunction()
上面的python脚本有助于导入随机时间模块。 我们创建了timer_func()装饰器函数。 这里面有function_timer()函数。 现在,嵌套函数会在调用传入函数之前抓取时间。 然后它等待函数返回并抓取结束时间。 这样,我们可以最终使python脚本打印执行时间。 该脚本将生成如下所示的输出。
Myfunction took 8.000457763671875 seconds to complete its execution.
什么是性能分析?
有时程序员想要测量一些属性,如使用内存,时间复杂度或使用关于程序的特定指令来衡量程序的真实能力。 这种关于程序的测量称为分析。 分析使用动态程序分析来进行这种测量。
在随后的章节中,我们将学习用于分析的不同Python模块。
cProfile - 内置模块
cProfile是一个用于分析的Python内置模块。 该模块是一个具有合理开销的C扩展,适合分析长时间运行的程序。 运行后,它会记录所有的功能和执行时间。 这是非常强大的,但有时难以解释和操作。 在下面的例子中,我们在下面的代码中使用cProfile -
示例
def increment_global():
global x
x += 1
def taskofThread(lock):
for _ in range(50000):
lock.acquire()
increment_global()
lock.release()
def main():
global x
x = 0
lock = threading.Lock()
t1 = threading.Thread(target=taskofThread, args=(lock,))
t2 = threading.Thread(target= taskofThread, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == "__main__":
for i in range(5):
main()
print("x = {1} after Iteration {0}".format(i,x))
上面的代码保存在thread_increment.py文件中。 现在,在命令行上用cProfile执行代码如下 -
(base) D:\ProgramData>python -m cProfile thread_increment.py
x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4
3577 function calls (3522 primitive calls) in 1.688 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:103(release)
5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:143(__init__)
5 0.000 0.000 0.000 0.000 <frozen importlib._bootstrap>:147(__enter__)
… … … …
从上面的输出中可以清楚地看到,cProfile打印出所有被调用的3577个函数,每个函数花费的时间和调用的次数。 以下是我们在输出中获得的列 -
- ncalls - 这是要调用的数字值。
- tottime - 这是在给定函数中花费的总时间。
- percall - 它指的是tottime除以ncalls的商。
- cumtime - 这是在这个和所有子功能中累计的时间。 递归函数甚至是准确的。
- percall - 它是cumtime除以原始调用的商。
- filename:lineno(function) - 它基本上提供了每个函数的相应数据。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
线程池 - Python并发编程教程™
假设我们必须多线程任务创建大量线程。 由于线程太多,因此可能会有很多性能问题,这在计算上会是最昂贵的。 一个主要问题可能是吞吐量受限。 我们可以通过创建一个线程池来解决这个问题。 一个线程池可以被定义为一组预先实例化和空闲的线程,它们随时可以开始工作。 创建线程池比我们需要执行大量任务时为每个任务实例化新线程更受欢迎。 线程池可以管理大量线程的并发执行,如下所示 -
- 如果线程池中的线程完成其执行,那么该线程可以被重用。
- 如果一个线程被终止,另一个线程将被创建以替换该线程。
Python模块 - Concurrent.futures
Python标准库包含concurrent.futures模块。 这个模块是在Python 3.2中添加的,为开发人员提供了启动异步任务的高级接口。 它是Python的线程和多处理模块的顶层的一个抽象层,用于提供使用线程或进程池运行任务的接口。
在后面的章节中,我们将学习concurrent.futures模块中的类。
执行者类
Executor是一个 Python concurrent.futures模块的抽象类。 它不能直接使用,我们需要使用以下具体子类之一 -
- ThreadPoolExecutor
- ProcessPoolExecutor
ThreadPoolExecutor - 一个具体的子类
它是Executor类的具体子类之一。 子类使用多线程,我们得到一个提交任务的线程池。 该池将任务分配给可用线程并安排它们运行。
如何创建一个ThreadPoolExecutor?
在concurrent.futures模块及其具体子类Executor的帮助下,可以很容易地创建一个线程池。 需要使用我们想要的池中的线程数构造一个ThreadPoolExecutor。 默认情况下,数字是5。然后可以提交一个任务到线程池。 当submit()任务时,会返回Future对象。 Future对象有一个名为done()的方法,它告诉Future是否已经解决。 有了这个,为这个特定的Future对象设定了一个值。 当任务完成时,线程池执行器将该值设置为Future的对象。
示例代码 -
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ThreadPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
执行上面示例代码,得到以下结果 -
False
True
Completed
在上面的例子中,一个ThreadPoolExecutor已经由5个线程构造而成。 然后,在提供消息之前等待2秒的任务被提交给线程池执行器。 从输出中可以看出,任务直到2秒才完成,所以第一次调用done()将返回False。 2秒后,任务完成,我们通过调用result()方法得到future的结果。
实例化ThreadPoolExecutor - 上下文管理器
另一种实例化ThreadPoolExecutor的方法是在上下文管理器的帮助下完成的。 它的工作方式与上例中使用的方法类似。 使用上下文管理器的主要优点是它在语法上看起来不错。 实例化可以在下面的代码的帮助下完成 -
with ThreadPoolExecutor(max_workers = 5) as executor
示例
以下示例是从Python文档借用的。 在这个例子中,首先必须导入concurrent.futures模块。 然后创建一个名为load_url()的函数,它将加载请求的url。 然后该函数用池中的5个线程创建ThreadPoolExecutor。 ThreadPoolExecutor已被用作上下文管理器。 我们可以通过调用result()方法来获得future的结果。
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'https://www.yiibai.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
以下将是上面的Python脚本的输出 -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.yiibai.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes
使用Executor.map()函数
Python map()函数广泛用于许多任务。 一个这样的任务是对可迭代内的每个元素应用某个函数。 同样,可以将迭代器的所有元素映射到一个函数,并将这些作为独立作业提交到ThreadPoolExecutor之外。 考虑下面的Python脚本示例来理解函数的工作原理。
示例
在下面的示例中,map函数用于将square()函数应用于values数组中的每个值。
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ThreadPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
以下将是上面的Python脚本的输出 -
4
9
16
25
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
进程池 - Python并发编程教程™
进程池可以像创建和使用线程池一样创建和使用。 进程池可以被定义为一组预先实例化和空闲的进程,它们随时可以开始工作。 当我们需要执行大量任务时,创建进程池优于为每个任务实例化新进程。
Python模块 - Concurrent.futures
Python标准库有一个叫做concurrent.futures的模块。 这个模块是在Python 3.2中添加的,为开发人员提供了启动异步任务的高级接口。 它是Python的线程和多处理模块的顶层的一个抽象层,用于提供使用线程或进程池运行任务的接口。
在后面的章节中,我们将要学习concurrent.futures模块的不同子类。
执行者类
Executor是 Python concurrent.futures模块的抽象类。 它不能直接使用,我们需要使用以下具体子类之一 -
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor - 一个具体的子类
它是Executor类的具体子类之一。 它使用多重处理,并且我们获得提交任务的过程池。 此池将任务分配给可用的进程并安排它们运行。
如何创建一个ProcessPoolExecutor?
通过concurrent.futures模块及其具体子类Executor的帮助,可以轻松创建一个过程池。 为此,需要构建一个ProcessPoolExecutor,其中包含需要的池中的进程数。 默认情况下,数字为5。然后将任务提交到进程池。
示例
现在将考虑创建线程池时使用的相同示例,唯一的区别是现在将使用ProcessPoolExecutor而不是ThreadPoolExecutor。
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
执行上面示例代码,得到以下结果 -
False
False
Completed
在上面的例子中,一个ProcessPoolExecutor已经被构造成5个线程。 然后在提交消息之前等待2秒的任务被提交给进程池执行器。 从输出中可以看出,任务直到2秒才完成,所以第一次调用done()将返回False。2秒后,任务完成,通过调用result()方法得到未来的结果。
实例化ProcessPoolExecutor - 上下文管理器
实例化ProcessPoolExecutor的另一种方法是借助上下文管理器。 它的工作方式与上例中使用的方法类似。 使用上下文管理器的主要优点是它在语法上看起来不错。 实例化可以在下面的代码的帮助下完成 -
with ProcessPoolExecutor(max_workers = 5) as executor
示例
为了更好地理解,这里演示创建线程池示例。 在这个例子中,我们需要从导入concurrent.futures模块开始。 然后创建一个名为load_url()的函数,它将加载请求的url。 然后使用池中的5个线程创建ProcessPoolExecutor。 ProcessPoolExecutor已被用作上下文管理器。 我们可以通过调用result()方法来获得future的结果。
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
上面的Python脚本将生成以下输出 -
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
使用Executor.map()函数
Python map()函数广泛用于执行许多任务。 一个这样的任务是对可迭代内的每个元素应用某个函数。 同样,可以将迭代器的所有元素映射到函数,并将这些作为独立作业提交给ProcessPoolExecutor。 考虑下面的Python脚本示例来理解这一点。
示例
我们将考虑使用Executor.map()函数创建线程池时使用的相同示例。 在下面的示例中,map函数用于将square()函数应用于values数组中的每个值。
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
上面的Python脚本将生成以下输出 -
4
9
16
25
何时使用ProcessPoolExecutor和ThreadPoolExecutor?
现在我们已经学习了两个Executor类 - ThreadPoolExecutor和ProcessPoolExecutor,我们需要知道何时使用哪个执行器。需要在受CPU限制的工作负载情况下选择ProcessPoolExecutor,而在受I/O限制的工作负载情况下则需要选择ThreadPoolExecutor。
如果使用ProcessPoolExecutor,那么不需要担心GIL,因为它使用多处理。 而且,与ThreadPoolExecution相比,执行时间会更少。 考虑下面的Python脚本示例来理解这一点。
示例
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
执行上面示例代码,得到以下结果 -
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
示例 - 使用ThreadPoolExecutor的Python脚本:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
执行上面示例代码,得到以下结果 -
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
从上述两个程序的输出中,可以看到使用ProcessPoolExecutor和ThreadPoolExecutor时执行时间的差异。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
多进程 - Python并发编程教程™
在本章中,我们将更多地关注多处理和多线程之间的比较。
多进程
在一台计算机系统中使用两个或多个CPU单元。 通过利用计算机系统中可用的全部CPU核心,这是最好的方法来充分利用我们的硬件。
多线程
这是CPU通过同时执行多个线程来管理操作系统使用的能力。 多线程的主要思想是通过将进程分成多个线程来实现并行性。
下表显示了它们之间的一些重要区别 -
编号 | 多进程 | 多程序 |
1 | 多处理是指多个CPU同时处理多个进程。 | 多程序同时在主存储器中保存多个程序,并使用单个CPU同时执行它们。 |
2 | 它利用多个CPU。 | 它利用单个CPU |
3 | 它允许并行处理。 | 上下文切换。 |
4 | 处理工作的时间更少。 | 处理工作需要花费更多的时间。 |
5 | 它有助于计算机系统设备的高效利用。 | 效率低于多重处理。 |
6 | 系统通常更昂贵。 | 这样的系统更便宜。 |
消除全局解释器锁定(GIL)的影响
在使用并发应用程序时,Python中存在一个名为GIL(全局解释器锁)的限制。 GIL从来不允许我们利用CPU的多个内核,因此可以说Python中没有真正的线程。 GIL是互斥锁 - 互斥锁,它使线程安全。 换句话说,可以说GIL阻止了多个线程并行执行Python代码。锁一次只能由一个线程保存,如果想执行一个线程,那么它必须先获取锁。
通过使用多处理,可以通过GIL有效地绕过 -
- 通过使用多处理,利用多个进程的能力,因此使用GIL的多个实例。
- 由于这个原因,在程序中一次执行一个线程的字节码没有限制。
在Python中启动进程
可以使用以下三种方法在多处理模块内用Python启动进程 -
- Fork
- Spawn
- Forkserver
使用Fork创建一个流程
Fork命令是在UNIX中找到的标准命令。 它用于创建称为子进程的新进程。 此子进程与称为父进程的进程同时运行。 这些子进程也与其父进程相同,并继承父进程可用的所有资源。 使用Fork创建流程时使用以下系统调用 -
- fork() - 这是一个通常在内核中实现的系统调用,它用于创建进程的副本。
- getpid() - 该系统调用返回调用进程的进程ID(PID)。
示例
以下Python脚本示例将演示如何创建新的子进程并获取子进程和父进程的PID -
import os
def child():
n = os.fork()
if n > 0:
print("PID of Parent process is : ", os.getpid())
else:
print("PID of Child process is : ", os.getpid())
child()
执行上面示例代码,得到以下结果 -
PID of Parent process is : 25989
PID of Child process is : 25990
用Spawn创建一个进程
Spawn意味着开始新的事物。 因此,产生一个过程意味着父过程创建一个新进程。 父进程异步继续执行或等待子进程结束其执行。 按照这些步骤产生一个进程 -
- 导入多处理模块。
- 创建对象进程。
- 通过调用start()方法来启动进程活动。
- 等待进程完成其工作并通过调用join()方法退出。
示例
以下Python脚本示例产生三个进程 -
import multiprocessing
def spawn_process(i):
print ('This is process: %s' %i)
return
if __name__ == '__main__':
Process_jobs = []
for i in range(3):
p = multiprocessing.Process(target = spawn_process, args = (i,))
Process_jobs.append(p)
p.start()
p.join()
执行上面示例代码,得到以下结果 -
This is process: 0
This is process: 1
This is process: 2
使用Forkserver创建一个进程
Forkserver机制仅适用于那些支持通过Unix Pipes传递文件描述符的所选UNIX平台。 考虑以下几点来理解Forkserver机制的工作 -
- 服务器通过使用Forkserver机制来启动新进程。
- 然后服务器接收命令并处理创建新进程的所有请求。
- 要创建一个新的进程,python程序会向Forkserver发送一个请求,之后它会创建一个进程。
- 最后,我们可以在程序中使用这个新创建的进程。
守护进程如何在Python中进行处理
Python多处理模块允许通过它的守护进程选项来守护进程。 守护进程或在后台运行的进程遵循与守护进程线程类似的概念。 要在后台执行该进程,需要将守护进程标志设置为true。 只要主进程正在执行,守护进程将继续运行,并在完成执行或主程序被终止后终止进程。
示例
在这里,我们使用与守护进程线程中使用的相同的示例。 唯一的区别是模块从多线程更改为多处理,并将守护标志设置为true。 但是,如下所示,输出结果会发生变化 -
import multiprocessing
import time
def nondaemonProcess():
print("starting my Process")
time.sleep(8)
print("ending my Process")
def daemonProcess():
while True:
print("Hello")
time.sleep(2)
if __name__ == '__main__':
nondaemonProcess = multiprocessing.Process(target = nondaemonProcess)
daemonProcess = multiprocessing.Process(target = daemonProcess)
daemonProcess.daemon = True
nondaemonProcess.daemon = False
daemonProcess.start()
nondaemonProcess.start()
执行上面示例代码,得到以下结果 -
starting my Process
ending my Process
输出与守护进程线程生成的输出相比是不同的,因为没有守护进程模式的进程有输出。 因此,主程序结束后,守护进程会自动结束以避免运行进程的持久性。
在Python中终止进程
可以使用terminate()方法立即终止或终止一个进程。 在完成执行之前,我们将使用此方法来终止在函数的帮助下创建的子进程。
例子
import multiprocessing
import time
def Child_process():
print ('Starting function')
time.sleep(5)
print ('Finished function')
P = multiprocessing.Process(target = Child_process)
P.start()
print("My Process has terminated, terminating main thread")
print("Terminating Child Process")
P.terminate()
print("Child Process successfully terminated")
输出结果 -
My Process has terminated, terminating main thread
Terminating Child Process
Child Process successfully terminated
该输出显示程序在执行使用Child_process()函数创建的子进程之前终止。 这意味着子进程已成功终止。
在Python中识别当前进程
操作系统中的每个进程都具有称为PID的进程标识。 在Python中,可以借助以下命令找出当前进程的PID -
import multiprocessing
print(multiprocessing.current_process().pid)
例子
以下Python脚本示例用于找出主进程的PID以及子进程的PID -
import multiprocessing
import time
def Child_process():
print("PID of Child Process is: {}".format(multiprocessing.current_process().pid))
print("PID of Main process is: {}".format(multiprocessing.current_process().pid))
P = multiprocessing.Process(target=Child_process)
P.start()
P.join()
执行上面示例代码,得到以下结果 -
PID of Main process is: 9401
PID of Child Process is: 9402
在子类中使用进程
可以通过对threading.Thread类进行子分类来创建线程。 另外,还可以通过对multiprocessing.Process类进行子分类来创建流程。 要在子类中使用流程,需要考虑以下几点 -
- 需要定义一个Process类的新子类。
- 需要覆盖_init_(self [,args])类。
- 需要重写run(self [,args])方法来实现Process类
- 需要通过调用start()方法来启动进程。
参考以下代码 -
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print ('called run method in process: %s' %self.name)
return
if __name__ == '__main__':
jobs = []
for i in range(5):
P = MyProcess()
jobs.append(P)
P.start()
P.join()
执行上面示例代码,得到以下代码-
called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5
Python多处理模块 - Pool类
如果在Python应用程序中讨论简单的并行处理任务,那么多处理模块提供了Pool类。 下面的Pool类方法可以用来在主程序中创建多个子进程。
apply()方法
该方法与ThreadPoolExecutor的submit()方法类似,直到结果准备就绪。
apply_async()方法
当需要并行执行任务时,需要使用apply_async()方法将任务提交给池。 这是一个异步操作,直到执行完所有的子进程之后才会锁定主线程。
map()方法
就像apply()方法一样,它也会阻塞直到结果准备就绪。 它相当于内置的map()函数,它将多个块中的可迭代数据分开并作为单独的任务提交给进程池。
map_async()方法
它是map()方法的一个变体,apply_async()是apply()方法的变体。 它返回一个结果对象。 当结果准备就绪时,就会应用一个可调用对象。 可调用函数必须立即完成; 否则,处理结果的线程将被阻止。
例子
以下示例实现执行并行执行的进程池。 通过multiprocessing.Pool方法应用square()函数,可以简单计算数字的平方。 然后使用pool.map()提交5,因为输入是从0到4的整数列表。结果将被存储在p_outputs中并被打印输出结果 -
def square(n):
result = n*n
return result
if __name__ == '__main__':
inputs = list(range(5))
p = multiprocessing.Pool(processes = 4)
p_outputs = pool.map(function_square, inputs)
p.close()
p.join()
print ('Pool :', p_outputs)
执行上面示例代码,得到以下结果 -
Pool : [0, 1, 4, 9, 16]
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
进程间通信 - Python并发编程教程™
进程间通信表示进程之间的数据交换。 为了开发并行应用程序,需要在进程间交换数据。 下图显示了多个子过程之间同步的各种通信机制 -
各种通信机制
在本节中,我们将了解各种通信机制。 机制如下所述 -
队列
队列可以用于多进程程序。 多处理模块的Queue类与Queue.Queue类相似。 因此,可以使用相同的API。 Multiprocessing.Queue提供了进程间通信的线程和进程安全FIFO(先进先出)机制。
例子
下面是一个简单的例子,从python官方文档多处理了解Queue类的多处理概念。
from multiprocessing import Process, Queue
import queue
import random
def f(q):
q.put([42, None, 'hello'])
def main():
q = Queue()
p = Process(target = f, args = (q,))
p.start()
print (q.get())
if __name__ == '__main__':
main()
执行上面示例代码,得到以下结果 -
[42, None, 'hello']
管道
它是一种数据结构,用于在多进程程序中的进程之间进行通信。Pipe()函数返回一对由管道连接的连接对象,默认情况下是双工(双向)。 它的工作原理如下 -
它返回一对代表管道两端的连接对象。
每个对象都有两个方法 - send()和recv(),以在进程之间进行通信。
例子
下面是一个简单的例子,摘自python官方文档多处理,以理解Pipe()函数的多进程概念。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target = f, args = (child_conn,))
p.start()
print (parent_conn.recv())
p.join()
执行上面代码,得到以下结果 -
[42, None, 'hello']
管理器
Manager是一类多处理模块,它提供了一种协调所有用户之间共享信息的方式。管理器对象控制服务器进程,该进程管理共享对象并允许其他进程操纵它们。 换句话说,管理器提供了一种方法来创建可以在不同进程之间共享的数据。 以下是Manager对象的不同属性 -
- 管理器的主要属性是控制管理共享对象的服务器进程。
- 另一个重要属性是在任何进程修改它时更新所有共享对象。
例子
以下是使用管理器对象在服务器进程中创建列表记录,然后在该列表中添加新记录的示例。
import multiprocessing
def print_records(records):
for record in records:
print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))
def insert_record(record, records):
records.append(record)
print("A New record is added\n")
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
records = manager.list([('Computers', 1), ('Histoty', 5), ('Hindi',9)])
new_record = ('English', 3)
p1 = multiprocessing.Process(target = insert_record, args = (new_record, records))
p2 = multiprocessing.Process(target = print_records, args = (records,))
p1.start()
p1.join()
p2.start()
p2.join()
执行上面代码,得到以下结果 -
A New record is added
Name: Computers
Score: 1
Name: Histoty
Score: 5
Name: Hindi
Score: 9
Name: English
Score: 3
管理器命名空间的概念
Manager类带有名称空间的概念,这是一种在多个进程间共享多个属性的快速方法。 命名空间不具有任何可以调用的公共方法,但它们具有可写的属性。
例子
以下Python脚本示例如何使用命名空间在主进程和子进程之间共享数据 -
import multiprocessing
def Mng_NaSp(using_ns):
using_ns.x +=5
using_ns.y *= 10
if __name__ == '__main__':
manager = multiprocessing.Manager()
using_ns = manager.Namespace()
using_ns.x = 1
using_ns.y = 1
print ('before', using_ns)
p = multiprocessing.Process(target = Mng_NaSp, args = (using_ns,))
p.start()
p.join()
print ('after', using_ns)
执行上面示例代码,得到以下结果 -
before Namespace(x = 1, y = 1)
after Namespace(x = 6, y = 10)
Ctypes数组和值
Multiprocessing模块提供了Array和Value对象,用于将数据存储在共享内存映射中。 Array是从共享内存分配的Array和Value是从共享内存分配的ctypes对象。
Multiprocessing模块导入Process,Value,Array。
例子
下面的Python脚本是一个从python文档中获取的例子,它利用Ctypes Array和Value在进程间共享一些数据。
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target = f, args = (num, arr))
p.start()
p.join()
print (num.value)
print (arr[:])
执行上面示例代码,得到以下结果 -
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
通信顺序进程(CSP)
CSP用于说明系统与具有并行模型的其他系统的交互。 CSP是通过消息传递编写并发或编程的框架,因此它对于描述并发是有效的。
Python PyCSP库
要实现在CSP中找到的核心原语,Python有一个名为PyCSP的库。 它使实现非常简短和易读,因此可以非常容易地理解它。 以下是PyCSP的基本流程网络 -
在上面的PyCSP过程网络中,有两个过程 - 进程1和进程2。这些过程通过传递消息通过两个通道 - 通道1和通道2进行通信。
安装PyCSP
通过以下命令来安装Python的PyCSP库 -
pip install PyCSP
例子
下面的Python脚本是一个简单的例子,它可以并行运行两个进程。 它是在PyCSP库的帮助下完成的 -
from pycsp.parallel import *
import time
@process
def P1():
time.sleep(1)
print('P1 exiting')
@process
def P2():
time.sleep(1)
print('P2 exiting')
def main():
Parallel(P1(), P2())
print('Terminating')
if __name__ == '__main__':
main()
在上面的脚本中,已经创建了两个函数,即P1和P2,然后用@process进行装饰,将它们转换为进程。执行上面代码后,得到以下输出结果 -
P2 exiting
P1 exiting
Terminating
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
事件驱动程序 - Python并发编程教程™
事件驱动程序侧重于事件。 最终,程序的流程取决于事件。 到目前为止,我们正在处理顺序或并行执行模型,但具有事件驱动编程概念的模型称为异步模型。 事件驱动的编程依赖于一直监听新来的事件的事件循环。 事件驱动编程的工作取决于事件。 一旦事件循环,事件就决定执行什么以及按什么顺序执行。 以下流程图将帮助您了解其工作原理 -
Python模块 - Asyncio
Asyncio模块是在Python 3.4中添加的,它提供了使用协同例程编写单线程并发代码的基础结构。 以下是Asyncio模块使用的不同概念 -
事件循环
事件循环是处理计算代码中所有事件的功能。 它在执行整个程序的过程中一路行动,并跟踪事件的传入和执行。 Asyncio模块允许每个进程使用一个事件循环。 以下是Asyncio模块提供的用于管理事件循环的一些方法 -
- loop = get_event_loop() - 此方法将为当前上下文提供事件循环。
- loop.call_later(time_delay,callback,argument) - 此方法安排在给定的time_delay秒后要调用的回调。
- loop.call_soon(callback,argument) - 该方法安排一个尽可能快地被调用的回调函数。 回调在call_soon()返回并且控件返回到事件循环后调用。
- loop.time() - 此方法用于根据事件循环的内部时钟返回当前时间。
- asyncio.set_event_loop() - 此方法将设置当前上下文的事件循环为循环。
- asyncio.new_event_loop() - 此方法将创建并返回一个新的事件循环对象。
- loop.run_forever() - 此方法将运行,直到调用stop()方法。
例子
下面的事件循环示例通过使用get_event_loop()方法帮助打印hello world。 这个例子取自Python官方文档。
import asyncio
def hello_world(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
loop.call_soon(hello_world, loop)
loop.run_forever()
loop.close()
输出结果如下 -
Hello World
特征 - Future
这与表示未完成的计算的concurrent.futures.Future类兼容。 asyncio.futures.Future和concurrent.futures.Future之间存在以下差异 -
- result()和exception()方法不会接受超时参数,并在未来尚未完成时引发异常。
- 通过add_done_callback()注册的回调函数总是通过事件循环的call_soon()来调用。
- asyncio.futures.Future类与concurrent.futures包中的wait()和as_completed()函数不兼容。
例子
以下演示如何使用asyncio.futures.future类的示例。
import asyncio
async def Myoperation(future):
await asyncio.sleep(2)
future.set_result('Future Completed')
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(Myoperation(future))
try:
loop.run_until_complete(future)
print(future.result())
finally:
loop.close()
输出结果如下 -
Future Completed
协同程序
Asyncio中的协程的概念与线程模块下的标准线程对象的概念类似。 这是子程序概念的一般化。 协程在执行过程中可以暂停,以等待外部处理,并在完成外部处理时从其停止点返回。 以下两种方式可以帮助我们实施协同程序 -
async def function()
这是在Asyncio模块下实现协程的一种方法。 以下是一个相同的Python脚本 -
import asyncio
async def Myoperation():
print("First Coroutine")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(Myoperation())
finally:
loop.close()
执行上面示例代码,得到以下结果 -
First Coroutine
@asyncio.coroutine装饰器
另一种实现协程的方法是使用带有@asyncio.coroutine修饰器的生成器。 以下是一个相同的Python脚本 -
import asyncio
@asyncio.coroutine
def Myoperation():
print("First Coroutine")
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(Myoperation())
finally:
loop.close()
执行上面示例代码,得到以下结果 -
First Coroutine
任务
Asyncio模块的这个子类负责以并行方式在事件循环中执行协程。 以下Python脚本是并行处理某些任务的示例。
import asyncio
import time
async def Task_ex(n):
time.sleep(1)
print("Processing {}".format(n))
async def Generator_task():
for i in range(10):
asyncio.ensure_future(Task_ex(i))
int("Tasks Completed")
asyncio.sleep(2)
loop = asyncio.get_event_loop()
loop.run_until_complete(Generator_task())
loop.close()
执行上面示例代码,得到以下结果 -
Tasks Completed
Processing 0
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
传输
Asyncio模块提供了用于实现各种类型通信的传输类。 这些类不是线程安全的,并且在建立通信通道后总是与协议实例配对。
以下是从BaseTransport继承的不同类型的传输 -
- ReadTransport - 这是只读传输的接口。
- WriteTransport - 这是用于只写传输的接口。
- DatagramTransport - 这是发送数据的接口。
- BaseSubprocessTransport - 与BaseTransport类相似。
以下是BaseTransport类的五种不同方法,它们随后在四种BaseTransport类有不同的变型 -
- close() - 关闭运输。
- is_closing() - 如果传输正在关闭或者已经是closed.transports,则此方法将返回true。
- get_extra_info(name,default = none) - 这会给一些关于传输的额外信息。
- get_protocol() - 此方法将返回当前协议。
协议
Asyncio模块提供了可以继承的基类,以实现您的网络协议。 这些类与运输一起使用; 该协议解析传入数据并要求写入传出数据,而传输负责实际的I/O和缓冲。 以下是三种Protocol类 -
- Protocol - 这是实现用于TCP和SSL传输的流协议的基类。
- DatagramProtocol - 这是实现用于UDP传输的数据报协议的基类。
- SubprocessProtocol - 这是实现通过一组单向管道与子进程通信的协议的基类。
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。
反应式编程 - Python并发编程教程™
反应式编程是处理数据流和变化传播的编程范例。 这意味着,当数据流由一个组件发出时,更改将通过反应式编程库传播到其他组件。变化的传播将持续到最终的接收者。 事件驱动和反应式编程的区别在于,事件驱动式编程围绕事件展开,反应式编程围绕着数据展开。
ReactiveX或RX用于反应式编程
ReactiveX或者Raective Extension是反应式编程最着名的实现。 ReactiveX的工作取决于以下两个类 -
可观察的类
这个类是数据流或事件的来源,它打包传入的数据,以便数据可以从一个线程传递到另一个线程。 在某些观察者订阅它之前,它不会提供数据。
观察员类
该类使用observable发出的数据流。 可以有多个可观察的观察者,每个观察者将接收每个发射的数据项。 观察者可以通过订阅可观察到的三种类型的事件 -
- on_next()事件 - 它意味着数据流中有一个元素。
- on_completed()事件 - 它意味着排放已经结束,没有更多数据项到来。
- on_error()事件 - 它也意味着排放的结束,但在可观察到抛出错误的情况下。
RxPY - 用于反应式编程的Python模块
RxPY是一个Python模块,可用于反应式编程。 我们需要确保模块已安装。 以下命令可用于安装RxPY模块 -
pip install RxPY
例子
以下是一个Python脚本,它使用RxPY模块及Observable类和Observe类来进行反应式编程。 基本上有两类 -
- get_strings() - 用于从观察者获取字符串。
- PrintObserver() - 用于从观察者打印字符串。 它使用观察员班的所有三个事件。 它也使用subscribe()类。
参考以下实现代码 -
from rx import Observable, Observer
def get_strings(observer):
observer.on_next("Ram")
observer.on_next("Mohan")
observer.on_next("Shyam")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Finished")
def on_error(self, error):
print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())
执行上面示例代码,得到以下结果 -
Received Ram
Received Mohan
Received Shyam
Finished
用于反应式编程的PyFunctional库
PyFunctionalis是另一个可用于响应式编程的Python库。 它使我们能够使用Python编程语言创建功能程序。 这很有用,因为它允许我们通过使用链式函数操作符来创建数据管道。
RxPY和PyFunctional之间的区别
这两个库都用于响应式编程,并以类似的方式处理流,但两者的主要区别取决于数据的处理。 RxPY处理系统中的数据和事件,而PyFunctional专注于使用函数式编程范例转换数据。
安装PyFunctional模块
需要在使用之前安装这个模块。可以通过以下pip命令来安装 -
pip install pyfunctional
例子
以下示例使用PyFunctional模块及其seq类,它们充当可以迭代和操作的流对象。 在这个程序中,它使用将每个值加倍的lamda函数映射序列,然后过滤x大于4的值,最后将序列减少为所有剩余值的和。
from functional import seq
result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)
print ("Result: {}".format(result))
执行上面示例代码,得到以下结果 -
Result: 6
易百教程移动端:请扫描本页面底部(右侧)二维码并关注微信公众号,回复:"教程" 选择相关教程阅读或直接访问:http://m.yiibai.com 。




