1

I have a pyspark dataframe with two columns, ID and Elements. Column "Elements" has list element in it. It looks like this,

ID | Elements
_______________________________________
X  |[Element5, Element1, Element5]
Y  |[Element Unknown, Element Unknown, Element_Z]

I want to form a column with the most frequent element in the column 'Elements.' Output should look like,

ID | Elements                                           | Output_column 
__________________________________________________________________________
X  |[Element5, Element1, Element5]                      | Element5
Y  |[Element Unknown, Element Unknown, Element_Z]       | Element Unknown 

How can I do that using pyspark?

Thanks in advance.

Droid-Bird
  • 1,295
  • 5
  • 15
  • 34

1 Answers1

5

We can use higher order functions here (available from spark 2.4+)

  1. First use transform and aggregate to get counts for each distinct value in the array.
  2. Then sort the array of structs in descending manner and then get the first element.

from pyspark.sql import functions as F
temp = (df.withColumn("Dist",F.array_distinct("Elements"))
              .withColumn("Counts",F.expr("""transform(Dist,x->
                           aggregate(Elements,0,(acc,y)-> IF (y=x, acc+1,acc))
                                      )"""))
              .withColumn("Map",F.arrays_zip("Dist","Counts")
              )).drop("Dist","Counts")
out = temp.withColumn("Output_column",
                    F.expr("""element_at(array_sort(Map,(first,second)->
         CASE WHEN first['Counts']>second['Counts'] THEN -1 ELSE 1 END),1)['Dist']"""))

Output:

Note that I have added a blank array for ID z to test. Also you can drop the column Map by adding .drop("Map") to the output

out.show(truncate=False)

+---+---------------------------------------------+--------------------------------------+---------------+
|ID |Elements                                     |Map                                   |Output_column  |
+---+---------------------------------------------+--------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |[{Element5, 2}, {Element1, 1}]        |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|[{Element Unknown, 2}, {Element_Z, 1}]|Element Unknown|
|Z  |[]                                           |[]                                    |null           |
+---+---------------------------------------------+--------------------------------------+---------------+

For lower versions, you can use a udf with statistics mode:

from pyspark.sql import functions as F,types as T
from statistics import mode
u = F.udf(lambda x: mode(x) if len(x)>0 else None,T.StringType())

df.withColumn("Output",u("Elements")).show(truncate=False)
+---+---------------------------------------------+---------------+
|ID |Elements                                     |Output         |
+---+---------------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|Element Unknown|
|Z  |[]                                           |null           |
+---+---------------------------------------------+---------------+
anky
  • 71,373
  • 8
  • 36
  • 64