5

This is my DataFrame in PySpark:

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   5       A
2015-10-13 13:00:00+00:00   6       A
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   11      B

The values of data are cumulative.

I want to get this result (differences between consecutive rows, grouped by feed):

utc_timestamp               data    feed
2015-10-13 11:00:00+00:00   1       A
2015-10-13 12:00:00+00:00   4       A
2015-10-13 13:00:00+00:00   1       A  
2015-10-13 14:00:00+00:00   10      B
2015-10-13 15:00:00+00:00   1       B

In pandas I would do it this way:

df["data"] -= (df.groupby("feed")["data"].shift(fill_value=0))

How can I do the same thing in PySpark?

Fluxy
  • 2,345
  • 1
  • 22
  • 45

2 Answers2

9

You can do this using lag function with a window:

from pyspark.sql.window import Window
import pyspark.sql.functions as f

window = Window.partitionBy("feed").orderBy("utc_timestamp")

df = df.withColumn("data", f.col("data") - f.lag(f.col("data"), 1, 0).over(window))
elyptikus
  • 579
  • 4
  • 18
Shadowtrooper
  • 1,420
  • 15
  • 22
6

You can use lag as a substitute for shift, and coalesce( , F.lit(0)) as a substitute for fill_value=0

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window = Window.partitionBy("feed").orderBy("utc_timestamp")

data = F.col("data") - F.coalesce(F.lag(F.col("data")).over(window), F.lit(0))
df.withColumn("data", data)
mck
  • 37,331
  • 13
  • 29
  • 45