Python-Multiprozess-Beizfehler

Es tut mir leid, dass ich den Fehler nicht mit einem einfacheren Beispiel reproduzieren kann, und mein Code ist zu kompliziert, um zu posten. Wenn ich das Programm in IPython Shell statt der regulären Python laufen, geht es gut aus.

Ich habe einige vorherige Hinweise zu diesem Problem gesehen. Sie wurden alle verursacht durch die Verwendung von Pool zu nennen Funktion definiert innerhalb einer Klassenfunktion. Aber das ist bei mir nicht der Fall.

Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib64/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Ich würde jede Hilfe schätzen.

UPDATE: Die Funktion I pickle ist auf der obersten Ebene des Moduls definiert. Obwohl es eine Funktion anruft, die eine verschachtelte Funktion enthält. Dh f () ruft g () Anrufe h () auf, die eine verschachtelte Funktion i () haben, und ich rufe pool.apply_async (f) an. F (), g (), h () sind alle auf der obersten Ebene definiert. Ich habe ein einfacheres Beispiel mit diesem Muster versucht und es funktioniert aber.

7 Solutions collect form web for “Python-Multiprozess-Beizfehler”

Hier ist eine Liste von was gebeizt werden kann . Insbesondere sind Funktionen nur wählbar, wenn sie auf der obersten Ebene eines Moduls definiert sind.

Dieses Stück Code:

 import multiprocessing as mp class Foo(): @staticmethod def work(self): pass pool = mp.Pool() foo = Foo() pool.apply_async(foo.work) pool.close() pool.join() 

Ergibt einen Fehler, der fast identisch mit dem ist, den du gepostet hast:

 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Das Problem ist, dass die pool Methoden alle eine queue.Queue , um Aufgaben an die Worker-Prozesse zu übergeben. Alles, was durch die queue.Queue muss abnehmbar sein, und foo.work ist nicht auswählbar, da es nicht auf der obersten Ebene des Moduls definiert ist.

Es kann behoben werden, indem man eine Funktion auf der obersten Ebene definiert, die foo.work() :

 def work(foo): foo.work() pool.apply_async(work,args=(foo,)) 

Beachten Sie, dass foo ist abnehmbar, da Foo ist auf der obersten Ebene definiert und foo.__dict__ ist picklable.

Ich würde pathos.multiprocesssing , anstelle von multiprocessing . pathos.multiprocessing ist eine Gabel von multiprocessing , die dill . dill kann fast alles in Python serialisieren, so dass Sie in der Lage sind, viel mehr um parallel zu senden. Die pathos Gabel hat auch die Möglichkeit, direkt mit mehreren Argumentfunktionen zu arbeiten, wie Sie für Klassenmethoden benötigen.

 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> p = Pool(4) >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> p.map(t.plus, x, y) [4, 6, 8, 10] >>> >>> class Foo(object): ... @staticmethod ... def work(self, x): ... return x+1 ... >>> f = Foo() >>> p.apipe(f.work, f, 100) <processing.pool.ApplyResult object at 0x10504f8d0> >>> res = _ >>> res.get() 101 

Holen Sie sich pathos (und wenn Sie möchten, dill ) hier: https://github.com/uqfoundation

Wie andere schon gesagt haben, kann multiprocessing nur Python-Objekte auf Worker-Prozesse übertragen, die gebeizt werden können. Wenn Sie Ihren Code nicht wie von unutbu beschrieben reorganisieren können, können Sie dill s erweiterte Beiz- / Entflechtungsfunktionen für die Übertragung von Daten (insbesondere Codedaten) verwenden, wie ich unten zeige.

Diese Lösung erfordert nur die Installation von dill und keine anderen Bibliotheken als pathos :

 import os from multiprocessing import Pool import dill def run_dill_encoded(payload): fun, args = dill.loads(payload) return fun(*args) def apply_async(pool, fun, args): payload = dill.dumps((fun, args)) return pool.apply_async(run_dill_encoded, (payload,)) if __name__ == "__main__": pool = Pool(processes=5) # asyn execution of lambda jobs = [] for i in range(10): job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1)) jobs.append(job) for job in jobs: print job.get() print # async execution of static method class O(object): @staticmethod def calc(): return os.getpid() jobs = [] for i in range(10): job = apply_async(pool, O.calc, ()) jobs.append(job) for job in jobs: print job.get() 

Ich habe festgestellt, dass ich auch genau diese Fehlerausgabe auf einem perfekt arbeitenden Code herunterladen kann, indem ich versuche, den Profiler darauf zu benutzen.

Beachten Sie, dass dies bei Windows war (wo das Gabel ein bisschen weniger elegant ist).

Ich lief:

 python -m profile -o output.pstats <script> 

Und festgestellt, dass das Entfernen der Profilierung entfernt den Fehler und Platzierung der Profilierung wiederhergestellt. Ich trieb mich auch schlecht, weil ich wusste, dass der Code benutzt wurde, um zu arbeiten. Ich war auf der Suche nach sehen, ob etwas hat aktualisiert Pool.py … dann hatte ein sinkendes Gefühl und beseitigt die Profilierung und das war es.

Posting hier für die Archive, falls irgendjemand sonst hinein läuft.

Diese Lösung erfordert nur die Installation von Dill und keine anderen Bibliotheken als Pathos

 def apply_packed_function_for_map((dumped_function, item, args, kwargs),): """ Unpack dumped function as target function and call it with arguments. :param (dumped_function, item, args, kwargs): a tuple of dumped function and its arguments :return: result of target function """ target_function = dill.loads(dumped_function) res = target_function(item, *args, **kwargs) return res def pack_function_for_map(target_function, items, *args, **kwargs): """ Pack function and arguments to object that can be sent from one multiprocessing.Process to another. The main problem is: «multiprocessing.Pool.map*» or «apply*» cannot use class methods or closures. It solves this problem with «dill». It works with target function as argument, dumps it («with dill») and returns dumped function with arguments of target function. For more performance we dump only target function itself and don't dump its arguments. How to use (pseudo-code): ~>>> import multiprocessing ~>>> images = [...] ~>>> pool = multiprocessing.Pool(100500) ~>>> features = pool.map( ~... *pack_function_for_map( ~... super(Extractor, self).extract_features, ~... images, ~... type='png' ~... **options, ~... ) ~... ) ~>>> :param target_function: function, that you want to execute like target_function(item, *args, **kwargs). :param items: list of items for map :param args: positional arguments for target_function(item, *args, **kwargs) :param kwargs: named arguments for target_function(item, *args, **kwargs) :return: tuple(function_wrapper, dumped_items) It returs a tuple with * function wrapper, that unpack and call target function; * list of packed target function and its' arguments. """ dumped_function = dill.dumps(target_function) dumped_items = [(dumped_function, item, args, kwargs) for item in items] return apply_packed_function_for_map, dumped_items 

Es funktioniert auch für numpy Arrays.

Sie übergeben eine numpy Reihe von Strings durch irgendeine Chance?

Ich habe diesen gleichen genauen Fehler gehabt, wenn ich ein Array passiere, das zufällig einen leeren String enthält. Ich denke, es kann aufgrund dieses Bugs sein: http://projects.scipy.org/numpy/ticket/1658

 Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Dieser Fehler wird auch kommen, wenn Sie eine eingebaute Funktion innerhalb des Modellobjekts haben, das an den asynchronen Job übergeben wurde.

So stellen Sie sicher, dass die Modellobjekte, die übergeben wurden, nicht über eingebaute Funktionen überprüft haben. (In unserem Fall benutzten wir FieldTracker() -Funktion von django-model-utils im Modell, um ein bestimmtes Feld zu verfolgen). Hier ist der Link zur relevanten GitHub-Ausgabe.

  • Python "IOError: [Errno 22] Ungültiges Argument" bei der Verwendung von cPickle, um ein großes Array auf das Netzwerklaufwerk zu schreiben
  • Wie kann ich beschleunigen, um große Objekte zu entfernen, wenn ich viel RAM habe?
  • Was ist der schnellste Weg, um einen Pandas DataFrame zu pflücken?
  • So erstellen Sie eine persistente Klasse mit Pickle in Python
  • Mit pickle.dump - TypeError: muss str sein, nicht Bytes
  • ImportError: Kein Modul namens 'pandas.indexes'
  • Installieren von cPickle mit Python 3.5
  • Pickle EOFError: Ran aus der Eingabe, wenn recv aus einer Steckdose
  • Schnellste Weg zum Speichern und Laden eines großen Wörterbuchs in Python
  • Python: Pickling ein Dict mit einigen unpicklable Elemente
  • Wie kann ich ein großes Wörterbuch speichern?
  • Python ist die beste Programmiersprache der Welt.