Synchrones / asynchrones Verhalten von Python-Pipes

In meiner Anwendung verwende ich Pipes aus dem Multiprocessing-Modul, um zwischen Python-Prozessen zu kommunizieren. In letzter Zeit habe ich ein komisches Verhalten beobachtet, abhängig von der Größe der Daten, die ich durch sie schicke. Nach der Pythonendokumentation basieren diese Pipes auf den Verbindungen und sollten sich asynchron verhalten, doch manchmal stecken sie beim Senden. Wenn ich die Vollduplexe in jeder Verbindung freigehe, funktioniert alles gut, obwohl ich die Anschlüsse nicht für das Senden und Hören verwende. Kann jemand dieses Verhalten erklären?

  1. 100 schwimmt, Vollduplex deaktivieren
    Der Code funktioniert unter Verwendung der Asynchronität.
  2. 100 schwimmt, Vollduplex-Aktivierung
    Das Beispiel funktioniert wie erwartet.
  3. 10000 schwimmt, Vollduplex deaktivieren
    Die Ausführung ist für immer gesperrt, obwohl es gut war mit den kleineren Daten.
  4. 10000 schwimmt, Vollduplex aktivieren
    Wieder gut.

Code (es ist nicht meine Produktion Code, es nur illustriert, was ich meine):

from collections import deque from multiprocessing import Process, Pipe from numpy.random import randn from os import getpid PROC_NR = 4 DATA_POINTS = 100 # DATA_POINTS = 10000 def arg_passer(pipe_in, pipe_out, list_): my_pid = getpid() print "{}: Before send".format(my_pid) pipe_out.send(list_) print "{}: After send, before recv".format(my_pid) buf = pipe_in.recv() print "{}: After recv".format(my_pid) if __name__ == "__main__": pipes = [Pipe(False) for _ in range(PROC_NR)] # pipes = [Pipe(True) for _ in range(PROC_NR)] pipes_in = deque(p[0] for p in pipes) pipes_out = deque(p[1] for p in pipes) pipes_in.rotate(1) pipes_out.rotate(-1) data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)] processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo])) for foo in xrange(PROC_NR)] for proc in processes: proc.start() for proc in processes: proc.join() 

One Solution collect form web for “Synchrones / asynchrones Verhalten von Python-Pipes”

Zuerst ist es erwähnenswert, die Umsetzung der multiprocessing.Pipe Klasse …

 def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) c2 = _multiprocessing.Connection(fd2, readable=False) return c1, c2 

Der Unterschied besteht darin, dass Halbduplex 'Pipes' eine anonyme Pipe verwenden , aber Vollduplex 'Pipes' tatsächlich eine Unix Domain Sockel verwenden , da anonyme Pipes von der Natur unidirektional sind.

Ich bin mir nicht sicher, was du mit dem Begriff "asynchron" in diesem Zusammenhang meinst. Wenn du "non-blocking I / O" meinst, dann ist es erwähnenswert, dass beide Implementierungen standardmäßig das Blockieren von I / O verwenden.


Zweitens lohnt es sich, die eingelegte Größe der Daten zu bemerken, die du versuchst zu senden …

 >>> from numpy.random import randn >>> from cPickle import dumps >>> len(dumps(randn(100))) 2479 >>> len(dumps(randn(10000))) 237154 

Drittens, von der pipe(7) Manpage …

Rohrkapazität

Ein Rohr hat eine begrenzte Kapazität. Wenn die Pipe voll ist, wird ein Schreib (2) blockieren oder fehlschlagen, je nachdem, ob das Flag O_NONBLOCK gesetzt ist (siehe unten). Unterschiedliche Implementierungen haben unterschiedliche Grenzen für die Rohrkapazität. Anwendungen sollten sich nicht auf eine bestimmte Kapazität verlassen: Eine Anwendung sollte so gestaltet sein, dass ein Lesevorgang Daten verbraucht, sobald sie verfügbar sind, so dass ein Schreibvorgang nicht blockiert bleibt.

In Linux-Versionen vor 2.6.11 war die Kapazität einer Pipe die gleiche wie die System-Seitengröße (zB 4096 Bytes auf i386). Seit Linux 2.6.11 beträgt die Rohrkapazität 65536 Bytes.


So haben Sie in Wirklichkeit einen Deadlock angelegt, in dem alle pipe_out.send() auf dem pipe_out.send() Anruf gesperrt sind und keiner von ihnen kann Daten von den anderen Prozessen erhalten, da Sie alle 237.154 Bytes Daten senden Ein Treffer, der den 65.536 Byte Puffer gefüllt hat.

Sie könnten versucht werden, nur die Unix-Domain-Socket-Version zu verwenden, aber der einzige Grund, warum es derzeit funktioniert, ist, dass es eine größere Puffergröße als eine Pipe hat, und Sie werden feststellen, dass die Lösung auch fehlschlägt, wenn Sie die Anzahl der DATA_POINTS Auf 100.000.

Die "schnelle n" schmutzige Hack "-Lösung ist, die Daten in kleinere Stücke für das Senden zu brechen, aber es ist nicht gut, sich auf die Puffer zu verlassen, die eine bestimmte Größe sind.

Eine bessere Lösung wäre, non-blocking I / O auf dem pipe_out.send() Anruf zu verwenden, obwohl ich nicht vertraut genug mit dem multiprocessing Modul bin, um den besten Weg zu ermitteln, um es mit diesem Modul zu erreichen.

Der Pseudocode wäre …

 while 1: if we have sent all data and received all data: break send as much data as we can without blocking receive as much data as we can without blocking if we didn't send or receive anything in this iteration: sleep for a bit so we don't waste CPU time continue 

… oder Sie können das Python- select , um zu vermeiden, länger zu schlafen, als es notwendig ist, aber wieder, es mit multiprocessing.Pipe zu integrieren.Pipe könnte schwierig sein.

Es ist möglich, dass die multiprocessing.Queue Klasse das alles für dich macht, aber ich habe es noch nie benutzt, also musstest du einige Experimente machen.

Python ist die beste Programmiersprache der Welt.