Wie kann man Worker-Threads beenden, nachdem die Arbeit in einem Multithread-Produzenten-Consumer-Muster abgeschlossen ist?

Ich versuche, ein Multithreaded Producer-Consumer-Muster mit Queue.Queue in Python 2.7 zu implementieren. Ich versuche herauszufinden, wie man die Verbraucher, dh die Arbeiter-Threads, stoppen kann, sobald alle benötigten Arbeiten durchgeführt werden.

Siehe den zweiten Kommentar von Martin James zu dieser Antwort: https://stackoverflow.com/a/19369877/1175080

Schick eine 'Ich bin fertig' Aufgabe, anweisen die Pool-Threads zu beenden. Jeder Thread, der eine solche Aufgabe bekommt, verlangt sie und begeht dann Selbstmord.

Aber das geht nicht für mich. Siehe den folgenden Code zum Beispiel.

import Queue import threading import time def worker(n, q): # n - Worker ID # q - Queue from which to receive data while True: data = q.get() print 'worker', n, 'got', data time.sleep(1) # Simulate noticeable data processing time q.task_done() if data == -1: # -1 is used to indicate that the worker should stop # Requeue the exit indicator. q.put(-1) # Commit suicide. print 'worker', n, 'is exiting' break def master(): # master() sends data to worker() via q. q = Queue.Queue() # Create 3 workers. for i in range(3): t = threading.Thread(target=worker, args=(i, q)) t.start() # Send 10 items to work on. for i in range(10): q.put(i) time.sleep(0.5) # Send an exit indicator for all threads to consume. q.put(-1) print 'waiting for workers to finish ...' q.join() print 'done' master() 

Dieses Programm hängt, nachdem alle drei Arbeiter den Ausstiegsindikator gelesen haben, dh -1 aus der Warteschlange, weil jeder Arbeiter -1 vor dem Verlassen benötigt, also wird die Warteschlange niemals leer und q.join() kehrt niemals zurück.

Ich kam mit der folgenden, aber hässlichen Lösung, wo ich eine -1 Ausfahrt Indikator für jeden Arbeiter über die Warteschlange, so dass jeder Arbeiter kann es sehen und Selbstmord begehen. Aber die Tatsache, dass ich einen Ausstiegsindikator für jeden Arbeiter schicken muss, fühlt sich ein wenig hässlich an.

 import Queue import threading import time def worker(n, q): # n - Worker ID # q - Queue from which to receive data while True: data = q.get() print 'worker', n, 'got', data time.sleep(1) # Simulate noticeable data processing time q.task_done() if data == -1: # -1 is used to indicate that the worker should stop print 'worker', n, 'is exiting' break def master(): # master() sends data to worker() via q. q = Queue.Queue() # Create 3 workers. for i in range(3): t = threading.Thread(target=worker, args=(i, q)) t.start() # Send 10 items to work on. for i in range(10): q.put(i) time.sleep(0.5) # Send one stop indicator for each worker. for i in range(3): q.put(-1) print 'waiting for workers to finish ...' q.join() print 'done' master() 

Ich habe zwei Fragen.

  1. Kann die Methode, ein einziges Exit-Indikator für alle Threads zu senden (wie im zweiten Kommentar von https://stackoverflow.com/a/19369877/1175080 von Martin James erklärt) sogar funktionieren?
  2. Wenn die Antwort auf die vorherige Frage "Nein" ist, gibt es einen Weg, um das Problem in einer Weise zu lösen, dass ich nicht einen separaten Exit-Indikator für jeden Worker-Thread senden muss?

3 Solutions collect form web for “Wie kann man Worker-Threads beenden, nachdem die Arbeit in einem Multithread-Produzenten-Consumer-Muster abgeschlossen ist?”

Kann die Methode, ein einziges Exit-Indikator für alle Threads zu senden (wie im zweiten Kommentar von https://stackoverflow.com/a/19369877/1175080 von Martin James erklärt) sogar funktionieren?

Wie Sie bemerkt haben, kann es nicht funktionieren, das Verbreiten der Nachricht wird den letzten Thread machen, um die Warteschlange mit einem weiteren Element zu aktualisieren, und da Sie auf eine Warteschlange warten, die niemals leer sein wird, nicht mit dem Code, den Sie haben.

Wenn die Antwort auf die vorherige Frage "Nein" ist, gibt es einen Weg, um das Problem in einer Weise zu lösen, dass ich nicht einen separaten Exit-Indikator für jeden Worker-Thread senden muss?

Sie können join den Threads anstelle der Warteschlange teilnehmen:

 def worker(n, q): # n - Worker ID # q - Queue from which to receive data while True: data = q.get() print 'worker', n, 'got', data time.sleep(1) # Simulate noticeable data processing time q.task_done() if data == -1: # -1 is used to indicate that the worker should stop # Requeue the exit indicator. q.put(-1) # Commit suicide. print 'worker', n, 'is exiting' break def master(): # master() sends data to worker() via q. q = Queue.Queue() # Create 3 workers. threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)] for t in threads: threads.start() # Send 10 items to work on. for i in range(10): q.put(i) time.sleep(0.5) # Send an exit indicator for all threads to consume. q.put(-1) print 'waiting for workers to finish ...' for t in threads: t.join() print 'done' master() 

Wie die Warteschlange Dokumentation erklären, get Methode wird eine Ausführung, sobald seine leere so, wenn Sie wissen, bereits die Daten zu verarbeiten können Sie die Warteschlange füllen und dann Spam die Fäden:

 import Queue import threading import time def worker(n, q): # n - Worker ID # q - Queue from which to receive data while True: try: data = q.get(block=False, timeout=1) print 'worker', n, 'got', data time.sleep(1) # Simulate noticeable data processing time q.task_done() except Queue.Empty: break def master(): # master() sends data to worker() via q. q = Queue.Queue() # Send 10 items to work on. for i in range(10): q.put(i) # Create 3 workers. for i in range(3): t = threading.Thread(target=worker, args=(i, q)) t.start() print 'waiting for workers to finish ...' q.join() print 'done' master() 

Hier haben Sie ein Live-Beispiel

Senden Sie keinen Sonderfall für eine Aufgabe. Verwenden Sie eine Veranstaltung statt mit nicht-blocking bekommt in Ihre Arbeiter.

 stopping = threading.Event() def worker(n, q, timeout=1): # run until the master thread indicates we're done while not stopping.is_set(): try: # don't block indefinitely so we can return to the top # of the loop and check the stopping event data = q.get(False, timeout) # raised by q.get if non-blocking and we reach the timeout # on an empty queue except queue.Empty: continue q.task_done() def master(): ... print 'waiting for workers to finish' q.join() stopping.set() print 'done' 

Neben @DanielSanchez ausgezeichnete Antwort, könnte ich vorschlagen, tatsächlich auf einen ähnlichen Mechanismus wie ein Java CountDownLatch verlassen . Das Wesentliche ist, dass du einen latch schaffst, der erst nach einem bestimmten Zähler auftaucht, wenn der Latch geöffnet ist, wird der Thread, der darauf wartet, seine Ausführung weiterleiten (ich habe ein einfaches Beispiel gemacht Hier für eine Klasse wie Beispiel für eine solche Latch):

 import Queue import threading import time WORKER_COUNT = 3 latch = threading.Condition() count = 3 def wait(): latch.acquire() while count > 0: latch.wait() latch.release() def count_down(): global count latch.acquire() count -= 1 if count <= 0: latch.notify_all() latch.release() def worker(n, q): # n - Worker ID # q - Queue from which to receive data while True: data = q.get() print 'worker', n, 'got', data time.sleep(1) # Simulate noticeable data processing time q.task_done() if data == -1: # -1 is used to indicate that the worker should stop # Requeue the exit indicator. q.put(-1) # Commit suicide. count_down() print 'worker', n, 'is exiting' break def master(): # master() sends data to worker() via q. q = Queue.Queue() # Create 3 workers. for i in range(WORKER_COUNT): t = threading.Thread(target=worker, args=(i, q)) t.start() # Send 10 items to work on. for i in range(10): q.put(i) time.sleep(0.5) # Send an exit indicator for all threads to consume. q.put(-1) wait() print 'done' master() 
  • Beschleunigungen in Schleifenstrukturen
  • Wie benutzt man eine Callback-Funktion in Python?
  • Wie finde ich die nächste Primzahl in einem Array, zu einer anderen Nummer in diesem Array?
  • Python: subprocess32 process.stdout.readline () Wartezeit
  • Python Liste der Tupel, müssen auspacken und aufräumen
  • Finde heraus, die Worte erschienen in einem Absatz
  • Pygame.mixer Modul fehlt?
  • Wie kann ich eine GUI mit der richtigen Größe mit wxPython öffnen?
  • Wiederverwendung von Django-Apps im eigenen Projekt, die eine niedrigere Django-Version erfordern
  • So erstellen Sie eine TRIE in Python
  • Bestimmen, wie oft ein Teilstring in einer Zeichenfolge in Python auftritt
  • Python ist die beste Programmiersprache der Welt.