报错信息

系统环境: Python3.8.9 macOS-12.2

1
2
3
4
5
6
7
8
Traceback (most recent call last):
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "test.py", line 86, in run
    print("The size of queue is empty? %s" % self.queue.qsize())
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 120, in qsize
    return self._maxsize - self._sem._semlock._get_value()
NotImplementedError

解决办法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from multiprocessing import Process, Value, get_context
from multiprocessing.queues import Queue as _Queue
from multiprocessing.context import assert_spawning

class SharedCounter(object):
    """ 一个同步的共享计数器 """

    def __init__(self, n = 0):
        self.count = Value('i', n)

    def increment(self, n = 1):
        """ 将计数器 +1 """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ 返回计数器的值 """
        return self.count.value


class Queue(_Queue):
    """ multiprocessing.Queue 的可移植实现。

    通过使用同步共享计数器(初始化为零)并在每次调用 put() 和 get() 方法时分别增加/减少其值, 从而实现 qsize() 和 empty() 。
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, ctx=get_context())
        self.size = SharedCounter(0)

    def __getstate__(self):
        # 序列化时调用
        assert_spawning(self)
        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                self._rlock, self._wlock, self._sem, self._opid,
                self.size)  # <--- 加上你自定义的属性

    def __setstate__(self, state):
        # 反序列化时调用, state 是 __getstate__ 的返回对象
        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
        self._rlock, self._wlock, self._sem, self._opid,
        self.size) = state  # <--- 加上你自定义的属性
        self._after_fork()

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super().put(*args, **kwargs)

    def get(self, *args, **kwargs):
        self.size.increment(-1)
        return super().get(*args, **kwargs)

    def qsize(self):
        return self.size.value

    def empty(self):
        return not self.qsize()

测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import time
import random

class Producer(Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        for i in range(5):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("Producer : item %d appended to queue %s" % (item, self.name))
            time.sleep(1)
            print("The size of queue is empty? %s" % self.queue.qsize())

class Consumer(Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            if self.queue.empty():
                print("the queue is empty")
                break
            else:
                time.sleep(1)
                item = self.queue.get()
                print('Consumer : item %d popped from by %s \n' % (item, self.name))
                time.sleep(1)

if __name__ == '__main__':
    q = Queue()
    producer = Producer(q)
    consumer = Consumer(q)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

🤓嘻嘻

评论 4

😆😌🤪