Blog·Tanky WooABOUTTAGSRSS

最近给Simiki从单进程改为多进程时, 遇到这个报错.

给一个正常的简化的版本:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing

def func(x):
    print(x * x)

class Klass(object):
    def __init__(self):
        print "Constructor ... %s" % multiprocessing.current_process().name

    def __del__(self):
        print "... Destructor %s" % multiprocessing.current_process().name

    def func(self, x):
        print(x * x)

    def run(self):
        pool = multiprocessing.Pool(processes=3)
        for num in range(8):
            pool.apply_async(func, args=(num,))
        pool.close()
        pool.join()

if __name__ == '__main__':
    _kls = Klass()
    _kls.run()

这里加上构造函数和析构函数, 是因为后面在这里会有问题.

另外, 这里有个比较奇怪的现象, 原先实例类时, 变量名是kls, 但是析构出错, 改为als没有问题, 试了很多变量名, 有的有问题, 有的没问题, 还不清楚原因

Constructor ... MainProcess
Exception AttributeError: "'NoneType' object has no attribute 'current_process'" in <bound method Klass.__del__ of <__main__.Klass object at 0x103a1ce10>> ignored

原因已经找到, 参考__del__文档:

Starting with version 1.5, Python guarantees that globals whose name begins with a single underscore are deleted from their module before other globals are deleted; if no other references to such globals exist, this may help in assuring that imported modules are still available at the time when the __del__() method is called.

对于普通变量, Python无法保证__del__和对象销毁的顺序, 如果变量名以下划线开头, 可以保证此变量在其它导入模块之前调用.

具体可以看看我在StackOverflow上的提问

正常执行结果:

Constructor ... MainProcess
0
1
4
9
16
25
36
49
... Destructor MainProcess

如果把func移到类中, 改为method, 则问题来了:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing

class Klass(object):
    def __init__(self):
        print "Constructor ... %s" % multiprocessing.current_process().name

    def __del__(self):
        print "... Destructor %s" % multiprocessing.current_process().name

    def func(self, x):
        print(x * x)

    def run(self):
        pool = multiprocessing.Pool(processes=3)
        for num in range(8):
            pool.apply_async(self.func, args=(num,))
        pool.close()
        pool.join()

if __name__ == '__main__':
    _kls = Klass()
    _kls.run()

执行报错:

Constructor ... MainProcess
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

搜到这个问题Can’t pickle <type ‘instancemethod’> when using python’s multiprocessing Pool.map()

原因是multiprocessing会对调用的函数做序列化, 然后method不可被序列化. 具体参考python2 - pickle的文档, 里面列出了可被序列化的对象:

  • None, True, and False
  • integers, long integers, floating point numbers, complex numbers
  • normal and Unicode strings
  • tuples, lists, sets, and dictionaries containing only picklable objects
  • functions defined at the top level of a module
  • built-in functions defined at the top level of a module
  • classes that are defined at the top level of a module
  • instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section The pickle protocol for details).

关于解决思路, 众说纷坛, 有利有弊.

最简单的思路? 感谢这个兄弟的思路, 加一个全局的代理函数, 他也注意到了后面的几个方法会出现析构多次, 但是这个其实也会这样:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing

def proxy(cls_instance, i):
    return cls_instance.func(i)

class Klass(object):
    def __init__(self):
        print "Constructor ... %s" % multiprocessing.current_process().name

    def __del__(self):
        print "... Destructor %s" % multiprocessing.current_process().name

    def func(self, x):
        print(x * x)

    def run(self):
        pool = multiprocessing.Pool(processes=3)
        for num in range(8):
            #pool.apply_async(self.func, args=(num,))
            pool.apply_async(proxy, args=(self, num,))
        pool.close()
        pool.join()

if __name__ == '__main__':
    _kls = Klass()
    _kls.run()

执行代码:

Constructor ... MainProcess
0
1
4
... Destructor PoolWorker-1
9
... Destructor PoolWorker-2
16
... Destructor PoolWorker-1
36
... Destructor PoolWorker-3
25
... Destructor PoolWorker-2
49
... Destructor PoolWorker-1
... Destructor PoolWorker-3
... Destructor PoolWorker-2
... Destructor MainProcess

注意到这里线程池中的worker析构了8次, 每个worker进程析构了2-3次, 主进程析构了1次.

还有一个介绍的比较多的就是用copy_reg将MethodType注册为可序列化来解决:

参考这个回答的模板:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import copy_reg
import types

def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

copy_reg.pickle(types.MethodType, _pickle_method)

或者这个回答的模板:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

pickle(MethodType, _pickle_method, _unpickle_method)

这里用到了mro()

  • im_self is the class instance object;
  • im_func is the function object;
  • im_class is the class of im_self for bound methods or the class that asked for the method for unbound methods;

具体见文档

#!/usr/bin/env python
# -*- coding: utf-8 -*-

class Klass(object):
    def func(self):
        pass

    def main(self):
        print 'self.func.im_self: %s' % self.func.im_self
        print 'self.func.im_class: %s' % self.func.im_class
        print 'self.func.im_func: %s' % self.func.im_func

_kls = Klass()
_kls.main()

print '---'

print '_kls.func.im_self: %s' % _kls.func.im_self
print '_kls.func.im_class: %s' % _kls.func.im_class
print '_kls.func.im_func: %s' % _kls.func.im_func

print '---'

print 'Klass.func.im_self: %s' % Klass.func.im_self
print 'Klass.func.im_class: %s' % Klass.func.im_class
print 'Klass.func.im_func: %s' % Klass.func.im_func

执行结果:

self.func.im_self: <__main__.Klass object at 0x101a0a390>
self.func.im_class: <class '__main__.Klass'>
self.func.im_func: <function func at 0x1019f1f50>
---
_kls.func.im_self: <__main__.Klass object at 0x101a0a390>
_kls.func.im_class: <class '__main__.Klass'>
_kls.func.im_func: <function func at 0x1019f1f50>
---
Klass.func.im_self: None
Klass.func.im_class: <class '__main__.Klass'>
Klass.func.im_func: <function func at 0x1019f1f50>

接着回来, 上面的方法, 即:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing
import copy_reg
import types

def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

copy_reg.pickle(types.MethodType, _pickle_method)

class Klass(object):
    def __init__(self):
        print "Constructor ... %s" % multiprocessing.current_process().name

    def __del__(self):
        print "... Destructor %s" % multiprocessing.current_process().name

    def func(self, x):
        print(x * x)

    def run(self):
        pool = multiprocessing.Pool(processes=3)
        for num in range(8):
            pool.apply_async(self.func, args=(num,))
        pool.close()
        pool.join()

if __name__ == '__main__':
    _kls = Klass()
    _kls.run()

执行代码:

Constructor ... MainProcess
0
1
4
... Destructor PoolWorker-1
9
... Destructor PoolWorker-2
16
... Destructor PoolWorker-1
36
... Destructor PoolWorker-3
25
... Destructor PoolWorker-1
49
... Destructor PoolWorker-2
... Destructor PoolWorker-1
... Destructor PoolWorker-3
... Destructor MainProcess

同上.

另一个方法是, 参考这个回答, 定义__call__. 这样就可以将类本身, 如这里的self传给apply_async, 因为顶层定义的类是可被序列化的:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing

class Klass(object):
    def __init__(self):
        print "Constructor ... %s" % multiprocessing.current_process().name

    def __del__(self):
        print "... Destructor %s" % multiprocessing.current_process().name

    def __call__(self, x):
        return self.func(x)

    def func(self, x):
        print(x * x)

    def run(self):
        pool = multiprocessing.Pool(processes=3)
        for num in range(8):
            #pool.apply_async(self.func, args=(num,))
            pool.apply_async(self, args=(num,))
        pool.close()
        pool.join()

if __name__ == '__main__':
    _kls = Klass()
    _kls.run()

执行代码:

Constructor ... MainProcess
0
1
4
... Destructor PoolWorker-1
9
... Destructor PoolWorker-2
16
... Destructor PoolWorker-3
25
... Destructor PoolWorker-1
36
... Destructor PoolWorker-2
49
... Destructor PoolWorker-3
... Destructor PoolWorker-1
... Destructor PoolWorker-2
... Destructor MainProcess

这个回答的方法其实和之前设置的代理函数一样, 都是把method提升到global:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing

def call_it(instance, name, args=(), kwargs=None):
    """indirect caller for instance methods and multiprocessing"""
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self):
        print "Constructor ... %s" % multiprocessing.current_process().name

    def __del__(self):
        print "... Destructor %s" % multiprocessing.current_process().name

    def func(self, x):
        print(x * x)

    def run(self):
        pool = multiprocessing.Pool(processes=3)
        for num in range(8):
            #pool.apply_async(self.func, args=(num,))
            pool.apply_async(call_it, args=(self, 'func', (num,)))
        pool.close()
        pool.join()

if __name__ == '__main__':
    _kls = Klass()
    _kls.run()

执行代码:

Constructor ... MainProcess
0
1
4
... Destructor PoolWorker-1
9
... Destructor PoolWorker-1
36
... Destructor PoolWorker-3
25
... Destructor PoolWorker-2
16
... Destructor PoolWorker-1
49
... Destructor PoolWorker-3
... Destructor PoolWorker-1
... Destructor PoolWorker-2
... Destructor MainProcess

关于析构8次的问题,看了copy_reg, pickle, multiprocessing的代码, 还是没什么头绪. 坑爹!

这块考虑, 最还还是把多进程调用的函数抽离出来放在全局是最安全的; 毕竟其它的方案又丑陋, 并且最主要的是各种黑魔法, 完全不清楚内部细节.


另外, Python3.3+ 解决了这个问题, 所以在Python3下是可运行的, 这里有个讨论