python 线程通信
在现实生活中,如果一个团队正在开展一项共同任务,那么他们之间应该进行沟通以正确完成任务。同样的类比也适用于线程。在编程中,为了减少处理器的理想时间,我们创建多个线程并为每个线程分配不同的子任务。因此,必须有一个通信设施,他们应该相互交互,以同步的方式完成工作。
考虑以下与线程互通有关的要点 -
- 没有性能提升 - 如果我们无法在线程和进程之间实现正确的通信,那么并发性和并行性带来的性能提升是没有用的。
- 正确完成任务 - 如果线程之间没有适当的相互通信机制,则无法正确完成分配的任务。
- 比进程 间通信更有效 - 线程 间通信比进程 间通信更高效,更易于使用,因为进程内的所有线程共享相同的地址空间,并且不需要使用共享内存。
用于线程安全通信的python数据结构
多线程代码会出现将信息从一个线程传递到另一个线程的问题。标准通信原语无法解决此问题。因此,我们需要实现自己的复合对象,以便在线程之间共享对象,以使通信成为线程安全的。以下是一些数据结构,它们在对其进行一些更改后提供了线程安全的通信
集
为了以线程安全的方式使用set数据结构,我们需要扩展set类来实现我们自己的锁定机制。
例
这是一个扩展类的python示例 -
class extend_class(set): def __init__(self, *args, **kwargs): self._lock = lock() super(extend_class, self).__init__(*args, **kwargs) def add(self, elem): self._lock.acquire() try: super(extend_class, self).add(elem) finally: self._lock.release() def delete(self, elem): self._lock.acquire() try: super(extend_class, self).delete(elem) finally: self._lock.release()
在上面的示例中,定义了一个名为 extend_class 的类对象,该对象继承自python 集合类。在此类的构造函数中创建一个锁对象。现在,有两个函数 - add() 和 delete() 。这些函数是定义的并且是线程安全的。它们都依赖于 超 类功能和一个关键异常。
装饰
这是线程安全通信的另一个关键方法,就是使用装饰器。
例
考虑一个python示例,演示如何使用装饰器和mminus;
def lock_decorator(method): def new_deco_method(self, *args, **kwargs): with self._lock: return method(self, *args, **kwargs) return new_deco_method class decorator_class(set): def __init__(self, *args, **kwargs): self._lock = lock() super(decorator_class, self).__init__(*args, **kwargs) @lock_decorator def add(self, *args, **kwargs): return super(decorator_class, self).add(elem) @lock_decorator def delete(self, *args, **kwargs): return super(decorator_class, self).delete(elem)
在上面的示例中,定义了一个名为lock_decorator的装饰器方法,该方法继承自python方法类。然后在此类的构造函数中创建一个锁对象。现在,有两个函数 - add()和delete()。这些函数是定义的并且是线程安全的。它们都依赖于超类功能和一个关键异常。
清单
列表数据结构是线程安全的,快速的以及用于临时内存存储的简单结构。在cpython中,gil可以防止对它们的并发访问。我们开始知道列表是线程安全的但是它们中的数据呢。实际上,列表的数据不受保护。例如,如果另一个线程试图做同样的事情,则 l.append(x) 不保证返回预期的结果。这是因为虽然 append() 是一个原子操作并且是线程安全的,但另一个线程试图以并发方式修改列表的数据,因此我们可以看到竞争条件对输出的副作用。
要解决此类问题并安全地修改数据,我们必须实现适当的锁定机制,这进一步确保多个线程不会潜在地遇到竞争条件。为了实现正确的锁定机制,我们可以像前面的例子中那样扩展类。
列表上的其他一些原子操作如下
l.append(x) l1.extend(l2) x = l[i] x = l.pop() l1[i:j] = l2 l.sort() x = y x.field = y d[x] = y d1.update(d2) d.keys()
在这里
- l,l1,l2都是列表
- d,d1,d2是字典
- x,y是对象
- i,j是整数
队列
如果列表的数据不受保护,我们可能不得不面对后果。我们可能会获取或删除错误的数据项,竞争条件。这就是为什么建议使用队列数据结构。一个真实的队列示例可以是单车道单向道路,车辆首先进入,首先退出。可以在售票窗口和公共汽车站看到更多真实世界的例子。
队列是默认的,线程安全的数据结构,我们不必担心实现复杂的锁定机制。python为我们提供了 模块在我们的应用程序中使用不同类型的队列。
队列类型
在本节中,我们将了解不同类型的队列。python提供了三个队列选项,可以从 < queue> 模块中使用
- 正常队列(fifo,先进先出)
- lifo,后进先出
- 优先
我们将在后续章节中了解不同的队列。
正常队列(fifo,先进先出)
它是python提供的最常用的队列实现。在这种排队机制中,无论谁先到,都会先获得服务。fifo也称为普通队列。
fifo队列的python实现
在python中,fifo队列可以用单线程和多线程实现。
单线程fifo队列
为了使用单线程实现fifo队列, queue 类将实现一个基本的先进先出容器。元素将使用 put() 添加到序列的一个“结尾” ,并使用 get() 从另一端删除。
例
以下是用于实现单线程fifo队列的python程序
import queue q = queue.queue() for i in range(8): q.put("item-" + str(i)) while not q.empty(): print (q.get(), end = " ")
输出
item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7
输出显示上面的程序使用单个线程来说明元素是按照它们插入的顺序从队列中删除的。
具有多个线程的fifo队列
为了实现具有多个线程的fifo,我们需要定义myqueue()函数,该函数从队列模块扩展。在使用单线程实现fifo队列时,get()和put()方法的工作与上面讨论的相同。然后为了使它成为多线程,我们需要声明并实例化线程。这些线程将以fifo方式使用队列。
例
以下是用于实现具有多个线程的fifo队列的python程序
import threading import queue import random import time def myqueue(queue): while not queue.empty(): item = queue.get() if item is none: break print("{} removed {} from the queue".format(threading.current_thread(), item)) queue.task_done() time.sleep(2) q = queue.queue() for i in range(5): q.put(i) threads = [] for i in range(4): thread = threading.thread(target=myqueue, args=(q,)) thread.start() threads.append(thread) for thread in threads: thread.join()
输出
removed 0 from the queue removed 1 from the queue removed 2 from the queue removed 3 from the queue removed 4 from the queue
lifo,后进先出队列
该队列使用与fifo(先进先出)队列完全相反的类比。在这个排队机制中,最后一个,将首先获得服务。这类似于实现堆栈数据结构。lifo队列在实现深度优先搜索(如人工智能算法)时非常有用。
python实现lifo队列
在python中,lifo队列可以用单线程和多线程实现。
单线程的lifo队列
为了使用单线程实现lifo队列, queue 类将使用结构 queue .lifoqueue 实现基本的后进先出容器。现在,在调用 put() 时,元素将添加到容器的头部,并在使用 get() 时从头部移除。
例
以下是用于使用单线程实现lifo队列的python程序
import queue q = queue.lifoqueue() for i in range(8): q.put("item-" + str(i)) while not q.empty(): print (q.get(), end=" ") output: item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0
输出显示上述程序使用单个线程来说明元素是按照它们插入的相反顺序从队列中删除的。
具有多个线程的lifo队列
实现类似于我们已经完成了具有多个线程的fifo队列的实现。唯一的区别是我们需要使用 queue 类,它将使用结构 queue.lifoqueue 实现基本的 后进 先出容器。
例
以下是用于实现具有多个线程的lifo队列的python程序 -
import threading import queue import random import time def myqueue(queue): while not queue.empty(): item = queue.get() if item is none: break print("{} removed {} from the queue".format(threading.current_thread(), item)) queue.task_done() time.sleep(2) q = queue.lifoqueue() for i in range(5): q.put(i) threads = [] for i in range(4): thread = threading.thread(target=myqueue, args=(q,)) thread.start() threads.append(thread) for thread in threads: thread.join()
输出
removed 4 from the queue removed 3 from the queue removed 2 from the queue removed 1 from the queue removed 0 from the queue
优先队列
在fifo和lifo队列中,项目的顺序与插入顺序有关。但是,在许多情况下,优先级比插入顺序更重要。让我们考虑一个现实世界的例子。假设机场的安检正在检查不同类别的人。可以优先检查vvip的人员,航空公司的工作人员,海关官员,类别,而不是像对于平民那样在到达的基础上进行检查。
优先级队列需要考虑的另一个重要方面是如何开发任务调度程序。一种常见的设计是在队列中优先处理大多数代理任务。此数据结构可用于根据队列的优先级值从队列中获取项目。
python优先级队列的实现
在python中,优先级队列可以用单线程和多线程实现。
单线程的优先级队列
为了实现具有单线程的优先级队列, queue 类将使用结构 queue .priorityqueue 在优先级容器上实现任务。现在,在调用 put() 时,元素将添加一个值,其中最低值将具有最高优先级,因此首先使用 get() 检索。
例
考虑使用以下python程序实现具有单线程的优先级队列
import queue as q p_queue = q.priorityqueue() p_queue.put((2, 'urgent')) p_queue.put((1, 'most urgent')) p_queue.put((10, 'nothing important')) prio_queue.put((5, 'important')) while not p_queue.empty(): item = p_queue.get() print('%s - %s' % item)
输出
1 – most urgent 2 - urgent 5 - important 10 – nothing important
在上面的输出中,我们可以看到队列已经根据优先级存储了项目 - 较少的值具有高优先级。
具有多线程的优先级队列
该实现类似于具有多个线程的fifo和lifo队列的实现。唯一的区别是我们需要使用 queue 类来使用结构 queue.priorityqueue 来初始化优先级。另一个区别在于生成队列的方式。在下面给出的示例中,将使用两个相同的数据集生成它。
例
以下python程序有助于实现具有多个线程的优先级队列
import threading import queue import random import time def myqueue(queue): while not queue.empty(): item = queue.get() if item is none: break print("{} removed {} from the queue".format(threading.current_thread(), item)) queue.task_done() time.sleep(1) q = queue.priorityqueue() for i in range(5): q.put(i,1) for i in range(5): q.put(i,1) threads = [] for i in range(2): thread = threading.thread(target=myqueue, args=(q,)) thread.start() threads.append(thread) for thread in threads: thread.join()
输出
removed 0 from the queue removed 0 from the queue removed 1 from the queue removed 1 from the queue removed 2 from the queue removed 2 from the queue removed 3 from the queue removed 3 from the queue removed 4 from the queue removed 4 from the queue