Python-Multiprocessing – Pipe vs Queue

Was sind die grundlegenden Unterschiede zwischen Warteschlangen und Pipes in Pythons Multiprocessing-Paket ?

In welchen Szenarien sollte man sich übereinander wählen? Wann ist es vorteilhaft, Pipe() ? Wann ist es vorteilhaft, Queue() ?

One Solution collect form web for “Python-Multiprocessing – Pipe vs Queue”

  • Ein Pipe() kann nur zwei Endpunkte haben.

  • Eine Queue() kann mehrere Hersteller und Verbraucher haben.

Wann sie zu benutzen sind

Wenn Sie mehr als zwei Punkte benötigen, um zu kommunizieren, verwenden Sie eine Queue() .

Wenn Sie absolute Leistung benötigen, ist ein Pipe() viel schneller, da Queue() auf Pipe() .

Leistungs-Benchmarking

Nehmen wir an, Sie wollen zwei Prozesse hervorbringen und Nachrichten so schnell wie möglich zwischen ihnen senden. Dies sind die Timing-Ergebnisse eines Drag-Rennens zwischen ähnlichen Tests mit Pipe() und Queue() … Dies ist auf einem ThinkpadT61 läuft Ubuntu 11.10 und Python 2.7.2.

FYI, ich warf Ergebnisse für JoinableQueue() als Bonus zurück; JoinableQueue() Konten für Tasks, wenn queue.task_done() aufgerufen wird (es weiß nicht einmal über die spezifische Aufgabe, es zählt nur unfertige Aufgaben in der Warteschlange), so dass queue.join() weiß, dass die Arbeit beendet ist.

Der Code für jeden unten dieser Antwort …

 mpenning@mpenning-T61:~$ python multi_pipe.py Sending 10000 numbers to Pipe() took 0.0369849205017 seconds Sending 100000 numbers to Pipe() took 0.328398942947 seconds Sending 1000000 numbers to Pipe() took 3.17266988754 seconds mpenning@mpenning-T61:~$ python multi_queue.py Sending 10000 numbers to Queue() took 0.105256080627 seconds Sending 100000 numbers to Queue() took 0.980564117432 seconds Sending 1000000 numbers to Queue() took 10.1611330509 seconds mpnening@mpenning-T61:~$ python multi_joinablequeue.py Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds mpenning@mpenning-T61:~$ 

Zusammenfassend ist Pipe() etwa dreimal schneller als eine Queue() . Denken Sie nicht einmal an die JoinableQueue() sei denn, Sie müssen wirklich die Vorteile haben.

BONUSMATERIAL 2

Multiprocessing führt subtile Änderungen im Informationsfluss ein, die das Debugging schwer machen, es sei denn, Sie kennen einige Verknüpfungen. Zum Beispiel könnten Sie ein Skript haben, das bei der Indizierung durch ein Wörterbuch unter vielen Bedingungen gut funktioniert, aber selten mit bestimmten Eingängen fehlschlägt.

Normalerweise bekommen wir Hinweise auf das Scheitern, wenn der gesamte Python-Prozess abstürzt; Sie erhalten jedoch keine unerwünschten Crash-Tracebacks, die auf die Konsole gedruckt werden, wenn die Multiprocessing-Funktion abstürzt. Das Verfolgen von unbekannten Multiprocessing-Crashs ist hart ohne einen Hinweis darauf, was den Prozess abgestürzt hat.

Der einfachste Weg, den ich gefunden habe, um den Multiprozessions-Crash-Information zu verfolgen, ist, die gesamte Multiprocessing-Funktion in einem try / except zu verpacken und mit traceback.print_exc() :

 import traceback def reader(args): try: # Insert stuff to be multiprocessed here return args[0]['that'] except: print "FATAL: reader({0}) exited while multiprocessing".format(args) traceback.print_exc() 

Nun, wenn du einen Crash findest, sieht man so etwas wie:

 FATAL: reader([{'crash', 'this'}]) exited while multiprocessing Traceback (most recent call last): File "foo.py", line 19, in __init__ self.run(task_q, result_q) File "foo.py", line 46, in run raise ValueError ValueError 

Quellcode:


 """ multi_pipe.py """ from multiprocessing import Process, Pipe import time def reader(pipe): output_p, input_p = pipe input_p.close() # We are only reading while True: try: msg = output_p.recv() # Read from the output pipe and do nothing except EOFError: break def writer(count, input_p): for ii in xrange(0, count): input_p.send(ii) # Write 'count' numbers into the input pipe if __name__=='__main__': for count in [10**4, 10**5, 10**6]: output_p, input_p = Pipe() reader_p = Process(target=reader, args=((output_p, input_p),)) reader_p.start() # Launch the reader process output_p.close() # We no longer need this part of the Pipe() _start = time.time() writer(count, input_p) # Send a lot of stuff to reader() input_p.close() # Ask the reader to stop when it reads EOF reader_p.join() print "Sending %s numbers to Pipe() took %s seconds" % (count, (time.time() - _start)) 

 """ multi_queue.py """ from multiprocessing import Process, Queue import time def reader(queue): while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): for ii in xrange(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': for count in [10**4, 10**5, 10**6]: queue = Queue() # reader() reads from queue # writer() writes to queue reader_p = Process(target=reader, args=((queue),)) reader_p.daemon = True reader_p.start() # Launch the reader process _start = time.time() writer(count, queue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print "Sending %s numbers to Queue() took %s seconds" % (count, (time.time() - _start)) 

 """ multi_joinablequeue.py """ from multiprocessing import Process, JoinableQueue import time def reader(queue): while True: msg = queue.get() # Read from the queue and do nothing queue.task_done() def writer(count, queue): for ii in xrange(0, count): queue.put(ii) # Write 'count' numbers into the queue if __name__=='__main__': for count in [10**4, 10**5, 10**6]: queue = JoinableQueue() # reader() reads from queue # writer() writes to queue reader_p = Process(target=reader, args=((queue),)) reader_p.daemon = True reader_p.start() # Launch the reader process _start = time.time() writer(count, queue) # Send a lot of stuff to reader() queue.join() # Wait for the reader to finish print "Sending %s numbers to JoinableQueue() took %s seconds" % (count, (time.time() - _start)) 
  • Übergeben einer Warteschlange an ThreadedHTTPServer
  • Ist meine HelloWorld-Warteschlange?
  • Schreiben in eine Datei mit Multiprocessing
  • Äquivalent von asyncio.Queues mit Arbeiter "Threads"
  • Wie man einen Thread mehr als einmal in Python läuft
  • Queue.Queue vs. collections.deque
  • Python mit mehreren Prozessoren
  • Holen Sie sich alle Artikel aus Thread Queue
  • Python, wie man Threads, die in der Warteschlange mit Signalen blockiert sind, tötet?
  • Python-Warteschlangen-Speicher leckt beim Anrufen des Threads
  • Python Queue get () / task_done () Ausgabe
  • Python ist die beste Programmiersprache der Welt.