Pandas-Stil Transformation von gruppierten Daten auf pyspark DataFrame

Wenn wir einen Pandas-Datenrahmen haben, der aus einer Spalte von Kategorien und einer Spalte von Werten besteht, können wir den Mittelwert in jeder Kategorie entfernen, indem wir folgendes tun:

df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g)) 

Soweit ich verstehe, Funken Dataframes nicht direkt bieten diese Gruppe-by / Transformation Betrieb (Ich bin mit pyspark auf Funken 1.5.0). Also, was ist der beste Weg, um diese Berechnung zu implementieren?

Ich habe versucht, eine Gruppe-by / join wie folgt:

 df2 = df.groupBy("Category").mean("Values") df3 = df2.join(df) 

Aber es ist sehr langsam, da, wie ich verstehe, jede Kategorie einen vollständigen Scan des DataFrame erfordert.

Ich denke, aber ich habe nicht überprüft), dass ich das sehr schnell beschleunigen kann, wenn ich das Ergebnis der Gruppe-by / mittleres in ein Wörterbuch sammle und dann dieses Wörterbuch in einer UDF wie folgt benutze:

 nameToMean = {...} f = lambda category, value: value - nameToMean[category] categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType()) df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value)) 

Gibt es eine idiomatische Möglichkeit, diese Art von Operation auszudrücken, ohne die Leistung zu beeinträchtigen?

    2 Solutions collect form web for “Pandas-Stil Transformation von gruppierten Daten auf pyspark DataFrame”

    Ich verstehe, jede Kategorie erfordert einen vollständigen Scan des DataFrame.

    Nein, das geht nicht DataFrame-Aggregationen werden mit einer Logik ähnlich aggregateByKey . Siehe DataFrame groupBy Verhalten / Optimierung Ein langsameres Teil ist eine join die Sortierung / Shuffling erfordert. Aber es braucht noch keinen Scan pro Gruppe.

    Wenn dies ein exakter Code ist, den Sie verwenden, ist es langsam, weil Sie keinen Join-Ausdruck bereitstellen. Aus diesem Grund führt es einfach ein kartesisches Produkt aus. Also ist es nicht nur ineffizient, sondern auch falsch. Sie wollen so etwas wie dieses:

     from pyspark.sql.functions import col means = df.groupBy("Category").mean("Values").alias("means") df.alias("df").join(means, col("df.Category") == col("means.Category")) 

    Ich denke (aber nicht überprüft), dass ich dies schnell beschleunigen kann, wenn ich das Ergebnis der Gruppe-by / mittleres in ein Wörterbuch sammle und dann dieses Wörterbuch in einem UDF benutze

    Es ist möglich, obwohl die Leistung von Fall zu Fall variieren wird. Ein Problem bei der Verwendung von Python UDFs ist, dass es Daten zu und von Python verschieben muss. Dennoch ist es definitiv einen Versuch wert. Sie sollten eine Broadcast-Variable für nameToMean .

    Gibt es eine idiomatische Möglichkeit, diese Art von Operation auszudrücken, ohne die Leistung zu beeinträchtigen?

    Im PySpark 1.6 können Sie die broadcast Funktion nutzen:

     df.alias("df").join( broadcast(means), col("df.Category") == col("means.Category")) 

    Aber es ist nicht verfügbar in <= 1,5.

    Tatsächlich gibt es einen idiomatischen Weg, dies in Spark zu tun, mit dem Hive OVER Ausdruck.

    Dh

     df.registerTempTable('df') with_category_means = sqlContext.sql('select *, mean(Values) OVER (PARTITION BY Category) as category_mean from df') 

    Unter der Haube benutzt man eine Fensterfunktion. Ich bin mir nicht sicher, ob dies schneller ist als deine Lösung

    Python ist die beste Programmiersprache der Welt.