Improve Pandas UDF in Pyspark

I must perform an aggregation within a sliding window in Pyspark. In particular, I must do the following operations:

  1. Consider 100 days worth data at a time
  2. GroupBy a given column of ID
  3. Take the last value of the aggregation
  4. Sum the values and return the result

These tasks must be computed in a sliding window with .rangeBetween(-100 days, 0)

I can easily achieve this result by constructing a Pandas UDF that takes as input some columns of the Pyspark DF, transform them into a Pandas DataFrame, and then compute the aggregation and return the scalar result. The UDF is then applied over the desired sliding Window.

Even though this solution works fine, it takes a lot of time (3-4 hours) to complete the task since the DFs contain millions of rows. Is there a way to improve the computational time of such operation? I am working with Pyspark in Databricks.

My pandas UDF is:

@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def freshness(date: pd.Series, csecid: pd.Series, analystid: pd.Series, revisions: pd.Series) -> float:
  df = pd.DataFrame({
      'date': date,
      'csecid': csecid,
      'analystid': analystid,
      'revisions': revisions, 
    }).sort_values('date')
  analysts = int(np.floor(df['analystid'].nunique() / 3)) 
  date = df.groupby('analystid').last().sort_values('date').iloc[-analysts]['date']
  df['freshness'] = np.where(df['date']<date, 0, weight)
  df['fresh_revisions'] = (df['revisions'].abs()+df['freshness'])*np.sign(df['revisions'])
  value = df['fresh_revisions'].sum() / df['fresh_revisions'].abs().sum()
  return value

And is applied in:

days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', freshness(F.col('date'), F.col('csecid'), F.col('analystid'), F.col('revisions_improved')).over(w))

Source: Windows Questions

LEAVE A COMMENT