7

I am performing a rolling median calculation on individual time series dataframes, then I want to concat/append the results.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
        collect_list("metric").over(w)) \
        .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

SeriesAppend

[DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float], DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float]]

When I attempt to .show():

'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'

I realize this is saying the object is a list of dataframes. How do I convert to a single dataframe?

I know that the following solution works for an explicit number of dataframes, but I want my for-loop to be agnostic to the number of dataframes:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

Is there a way to generalize this to non-explicit dataframe names?

mwhee
  • 552
  • 2
  • 5
  • 16
  • 1
    I guess you need `union`. Have a look at this [answer](https://stackoverflow.com/a/33744540/9274732), a method to union several dataframes from a list is explicited – Ben.T May 29 '19 at 16:19
  • 2
    union them all together. One way is to use `functools.reduce` and do the following: `reduce(lambda a, b: a.union(b), SeriesAppend[1:], SeriesAppend[0])` – pault May 29 '19 at 16:20
  • 1
    Possible duplicate of [Spark unionAll multiple dataframes](https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes). Second answer is for pyspark. – pault May 29 '19 at 16:29
  • If you add `"ID"` into your window `w` as another partitionBy argument, you do not need to do the for loop and union at all. Just subset the dataframe into the ids you want `test_df = test_df.where(col("ID").isin(series_list))` and you are good to go. – Richard Nemeth May 29 '19 at 18:13
  • Richard, that suggestion would work, but I will not know all my ID's. For instance, there will be somewhere around 30k series, but the exact N is not determined. – mwhee May 29 '19 at 18:58
  • 2
    @mwhee what do you mean by explicit number of dataframes? the point of using `reduce` is to perform the function (here union) as many times as you need it. If you do `df = reduce(DataFrame.unionAll, SeriesAppend)` outside of the `for` loop, you don't need to specify the number of dataframe anywhere. Or there is something else I missed/don't understand? – Ben.T May 29 '19 at 19:51

1 Answers1

14

Thanks everyone! To sum up - the solution uses Reduce and unionAll:

from functools import reduce
from pyspark.sql import DataFrame

SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
         collect_list("metric").over(w)) \
         .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

df_series = reduce(DataFrame.unionAll, SeriesAppend)
mwhee
  • 552
  • 2
  • 5
  • 16