Wie benutzt man Java / Scala-Funktion aus einer Aktion oder einer Transformation?

Hintergrund

Meine ursprüngliche Frage hier war Warum mit DecisionTreeModel.predict innerhalb Karte Funktion wirft eine Ausnahme? Und ist verwandt mit Wie man Tupel von (original lable, vorhergesagten Label) auf Spark mit MLlib zu generieren?

Wenn wir Scala API verwenden, ist eine empfohlene Vorhersage für RDD[LabeledPoint] mit DecisionTreeModel ist einfach über RDD zuzuordnen:

 val labelAndPreds = testData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } 

Leider ist ein ähnlicher Ansatz in PySpark nicht so gut:

 labelsAndPredictions = testData.map( lambda lp: (lp.label, model.predict(lp.features)) labelsAndPredictions.first() 

Ausnahme: Es scheint, dass Sie versuchen, auf SparkContext von einer Broadcast-Variable, Aktion oder Transforamtion zu verweisen. SparkContext kann nur auf dem Treiber verwendet werden, nicht in Code, dass es auf Arbeiter läuft. Weitere Informationen finden Sie unter SPARK-5063 .

Statt dieser offiziellen Dokumentation empfiehlt man so etwas:

 predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) 

Also, was ist hier los? Es gibt keine Broadcast-Variable hier und Scala API definiert predict wie folgt:

 /** * Predict values for a single data point using the model trained. * * @param features array representing a single data point * @return Double prediction from the trained model */ def predict(features: Vector): Double = { topNode.predict(features) } /** * Predict values for the given data set using the model trained. * * @param features RDD representing data points to be predicted * @return RDD of predictions for each of the given data points */ def predict(features: RDD[Vector]): RDD[Double] = { features.map(x => predict(x)) } 

So zumindest auf den ersten Blick ruft von Handlung oder Transformation ist kein Problem, da die Vorhersage eine lokale Operation zu sein scheint.

Erläuterung

Nach einigen graben habe ich herausgefunden, dass die Quelle des Problems eine JavaModelWrapper.call Methode ist, die von DecisionTreeModel.predict aufgerufen wird. Es SparkContext auf SparkContext der die Java-Funktion aufrufen muss:

 callJavaFunc(self._sc, getattr(self._java_model, name), *a) 

Frage

Im Falle von DecisionTreeModel.predict gibt es eine empfohlene Problemumgehung und alle erforderlichen Code ist bereits ein Teil der Scala API aber gibt es eine elegante Art und Weise zu behandeln Problem wie dies im Allgemeinen?

Nur Lösungen, die ich jetzt denken kann, sind ziemlich schwergewichtig:

  • Drückte alles auf JVM entweder durch die Erweiterung von Spark-Klassen durch Implizite Umwandlungen oder Hinzufügen einer Art von Wrapper
  • Mit dem Py4j Gateway direkt

One Solution collect form web for “Wie benutzt man Java / Scala-Funktion aus einer Aktion oder einer Transformation?”

Die Kommunikation mit dem Standard-Py4J-Gateway ist einfach nicht möglich. Um zu verstehen, warum wir das folgende Diagramm aus dem PySpark Internals Dokument [1] anschauen müssen:

Bildbeschreibung hier eingeben

Da das Py4J-Gateway auf dem Treiber läuft, ist es für Python-Interpreter nicht zugänglich, die mit JVM-Mitarbeitern über Sockets kommunizieren (siehe zB PythonRDD / rdd.py ).

Theoretisch könnte es möglich sein, ein eigenes Py4J-Gateway für jeden Arbeiter zu erstellen, aber in der Praxis ist es unwahrscheinlich, dass es nützlich ist. Ignorieren von Problemen wie Zuverlässigkeit Py4J ist einfach nicht für datenintensive Aufgaben ausgelegt.

Gibt es irgendwelche Workarounds?

  1. Verwenden von Spark SQL Data Sources API zum Einfügen von JVM-Code.

    Pros : Unterstützt, High Level, benötigt keinen Zugriff auf die interne PySpark API

    Nachteile : Relativ ausführlich und nicht sehr gut dokumentiert, begrenzt vor allem auf die Eingabedaten

  2. Bedienung von DataFrames mit Scala UDFs.

    Pros : Einfach zu implementieren (siehe Spark: Wie kann man Python mit Scala oder Java User Defined Functions abgeben ? ), Keine Datenkonvertierung zwischen Python und Scala, wenn Daten bereits in einem DataFrame gespeichert sind, minimaler Zugriff auf Py4J

    Cons : Benötigt Zugriff auf Py4J Gateway und interne Methoden, begrenzt auf Spark SQL, schwer zu debuggen, nicht unterstützt

  3. Erstellen von High-Level-Scala-Schnittstelle in ähnlicher Weise, wie es in MLLib gemacht wird.

    Pros : Flexibel, Fähigkeit, beliebigen komplexen Code auszuführen. Es kann entweder direkt auf RDD (siehe zB MLlib Modell Wrapper ) oder mit DataFrames (siehe Wie man eine Scala Klasse in Pyspark verwenden ). Die letztere Lösung scheint viel freundlicher zu sein, da alle ser-de Details bereits von der vorhandenen API behandelt werden.

    Nachteile : Niedrige Ebene, erforderliche Datenkonvertierung, genauso wie UDFs erfordert Zugriff auf Py4J und interne API, nicht unterstützt

    Einige grundlegende Beispiele finden Sie in Strings, die beim Konvertieren von Scala-Code aus einer PySpark-App nicht konvertiert werden

  4. Mit dem externen Workflow-Management-Tool können Sie zwischen Python- und Scala / Java-Jobs wechseln und Daten an eine DFS weitergeben.

    Pros : Einfach zu implementieren, minimale Änderungen am Code selbst

    Nachteile : Kosten für das Lesen / Schreiben von Daten ( Tachyon ?)

  5. Mit Shared SQLContext (siehe zB Apache Zeppelin oder Livy ), um Daten zwischen SQLContext mit registrierten temporären Tabellen zu übergeben.

    Pros : Gut geeignet für interaktive Analyse

    Nachteile : Nicht so viel für Batch-Jobs (Zeppelin) oder kann zusätzliche Orchestrierung (Livy)


  1. Joshua Rosen (2014, August 04) PySpark Internals . Abgerufen von https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
  • Filter auf der Grundlage einer anderen RDD in Spark
  • Scala: Merkmal für ein Funktionsobjekt mit variablen Längenargumenten?
  • Scala-Äquivalent von Python-Echo-Server / Client-Beispiel?
  • Listing alle Dateien in Spark Cluster auf Hadoop HDFS mit Scala oder Python gespeichert?
  • Funkenleistung für Scala vs Python
  • Wie benutzt man die JDBC-Quelle zum Schreiben und Lesen von Daten in (Py) Spark?
  • Eine Liste oder Karte als Funktionsargumente in Scala einpacken
  • Statisch typisierte Metaprogrammierung?
  • Interpretation einer Benchmark in C, Clojure, Python, Ruby, Scala und andere
  • Spark: Wie komme ich Python mit Scala oder Java User Defined Functions?
  • Wie benutzt man Scala und Python in einem selben Spark-Projekt?
  • Python ist die beste Programmiersprache der Welt.