Multiprocessing.pool.map und Funktion mit zwei Argumenten

Ich benutze multiprocessing.Pool()

Hier ist was ich will Pool:

 def insert_and_process(file_to_process,db): db = DAL("path_to_mysql" + db) #Table Definations db.table.insert(**parse_file(file_to_process)) return True if __name__=="__main__": file_list=os.listdir(".") P = Pool(processes=4) P.map(insert_and_process,file_list,db) # here having problem. 

Ich möchte 2 Argumente übergeben Was ich tun möchte, besteht darin, nur 4 DB-Verbindungen zu initialisieren (hier wird versucht, eine Verbindung zu jedem Funktionsaufruf zu schaffen, so dass möglicherweise Millionen von ihnen und IO-Freezed zum Tode führen). Wenn ich 4 db Verbindungen erstellen kann und 1 für jede Prozesse wird es ok sein.

Gibt es eine Lösung für Pool? Oder soll ich es aufgeben?

BEARBEITEN:

Von der Hilfe von beiden von Ihnen habe ich dies, indem Sie dies tun:

 args=zip(f,cycle(dbs)) Out[-]: [('f1', 'db1'), ('f2', 'db2'), ('f3', 'db3'), ('f4', 'db4'), ('f5', 'db1'), ('f6', 'db2'), ('f7', 'db3'), ('f8', 'db4'), ('f9', 'db1'), ('f10', 'db2'), ('f11', 'db3'), ('f12', 'db4')] 

Also hier, wie es funktionieren wird, werde ich den DB-Verbindungscode auf die Hauptebene verschieben und dies tun:

 def process_and_insert(args): #Table Definations args[1].table.insert(**parse_file(args[0])) return True if __name__=="__main__": file_list=os.listdir(".") P = Pool(processes=4) dbs = [DAL("path_to_mysql/database") for i in range(0,3)] args=zip(file_list,cycle(dbs)) P.map(insert_and_process,args) # here having problem. 

Ja, ich werde es ausprobieren und dich wissen lassen.

5 Solutions collect form web for “Multiprocessing.pool.map und Funktion mit zwei Argumenten”

Die Pool sagt nicht von einer Möglichkeit, mehr als einen Parameter an die Zielfunktion zu übergeben – ich habe versucht, nur eine Sequenz zu übergeben, wird aber nicht entfaltet (ein Element der Sequenz für jeden Parameter).

Allerdings können Sie Ihre Zielfunktion schreiben, um zu erwarten, dass der erste (und nur) Parameter ein Tupel ist, in dem jedes Element einer der Parameter ist, die Sie erwarten:

 from itertools import repeat def insert_and_process((file_to_process,db)): db = DAL("path_to_mysql" + db) #Table Definations db.table.insert(**parse_file(file_to_process)) return True if __name__=="__main__": file_list=os.listdir(".") P = Pool(processes=4) P.map(insert_and_process,zip(file_list,repeat(db))) 

(Beachten Sie die zusätzlichen Klammern in der Definition von insert_and_process – python behandeln, dass als ein einzelner Parameter, der eine 2-Item-Sequenz sein sollte. Das erste Element der Sequenz wird der ersten Variablen zugeschrieben und die andere zum zweiten)

Ihr Pool erzeugt vier Prozesse, die jeweils von der eigenen Instanz des Python-Interpreters laufen. Sie können eine globale Variable verwenden, um Ihr Datenbankverbindungsobjekt zu halten, so dass genau eine Verbindung pro Prozess erstellt wird:

 global_db = None def insert_and_process(file_to_process, db): global global_db if global_db is None: # If this is the first time this function is called within this # process, create a new connection. Otherwise, the global variable # already holds a connection established by a former call. global_db = DAL("path_to_mysql" + db) global_db.table.insert(**parse_file(file_to_process)) return True 

Da Pool.map() und Freunde nur Ein-Argument-Worker-Funktionen unterstützen, müssen Sie einen Wrapper erstellen, der die Arbeit weiterleitet:

 def insert_and_process_helper(args): return insert_and_process(*args) if __name__ == "__main__": file_list=os.listdir(".") db = "wherever you get your db" # Create argument tuples for each function call: jobs = [(file, db) for file in file_list] P = Pool(processes=4) P.map(insert_and_process_helper, jobs) 

Keine Notwendigkeit, Reißverschluss zu benutzen. Wenn zum Beispiel haben Sie 2 Parameter, x und y, und jeder von ihnen können mehrere Werte, wie:

 X=range(1,6) Y=range(10) 

Die Funktion sollte nur einen Parameter erhalten und ihn auspacken:

 def func(params): (x,y)=params ... 

Und du nennst es so:

 params = [(x,y) for x in X for y in Y] pool.map(func, params) 

Verwenden

 params=[(x,y) for x in X for y in Y] 

Sie erstellen eine vollständige Kopie von x und y , und das kann langsamer als mit

 from itertools import repeat P.map(insert_and_process,zip(file_list,repeat(db))) 

Sie können verwenden

 from functools import partial 

Bibliothek zu diesem Zweck

mögen

 func = partial(rdc, lat, lng) r = pool.map(func, range(8)) 

und

 def rdc(lat,lng,x): pass 
  • Lösen von peinlich parallelen Problemen mit Python-Multiprocessing
  • Ein Schloss zwischen Gunicorn Arbeiter teilen
  • Persistente Memoisierung in Python
  • Diek-Berechnung erfolgt nicht parallel
  • Wie kann ich mehrere Selenium Firefox Browser gleichzeitig betreiben?
  • Celery Beat: Begrenzt auf einzelne Task-Instanz zu einem Zeitpunkt
  • Python multiprocessing.Pool: Wann zu verwenden gelten, apply_async oder map?
  • Berechnen Sie auf pandas dataframe gleichzeitig
  • Ist Parallelität im Tornado möglich?
  • Python sock.listen (...)
  • Stackless python und multicores?
  • Python ist die beste Programmiersprache der Welt.