from multiprocessing import Process, Value, get_context
from multiprocessing.queues import Queue as _Queue
from multiprocessing.context import assert_spawning
class SharedCounter(object):
""" 一个同步的共享计数器 """
def __init__(self, n = 0):
self.count = Value('i', n)
def increment(self, n = 1):
""" 将计数器 +1 """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" 返回计数器的值 """
return self.count.value
class Queue(_Queue):
""" multiprocessing.Queue 的可移植实现。
通过使用同步共享计数器(初始化为零)并在每次调用 put() 和 get() 方法时分别增加/减少其值, 从而实现 qsize() 和 empty() 。
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=get_context())
self.size = SharedCounter(0)
def __getstate__(self):
# 序列化时调用
assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.size) # <--- 加上你自定义的属性
def __setstate__(self, state):
# 反序列化时调用, state 是 __getstate__ 的返回对象
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid,
self.size) = state # <--- 加上你自定义的属性
self._after_fork()
def put(self, *args, **kwargs):
self.size.increment(1)
super().put(*args, **kwargs)
def get(self, *args, **kwargs):
self.size.increment(-1)
return super().get(*args, **kwargs)
def qsize(self):
return self.size.value
def empty(self):
return not self.qsize()