I must perform an aggregation within a sliding window in Pyspark. In particular, I must do the following operations:

- Consider 100 days worth data at a time
- GroupBy a given column of ID
- Take the last value of the aggregation
- Sum the values and return the result

These tasks must be computed in a sliding window with `.rangeBetween(-100 days, 0)`

I can easily achieve this result by constructing a Pandas UDF that takes as input some columns of the Pyspark DF, transform them into a Pandas DataFrame, and then compute the aggregation and return the scalar result. The UDF is then applied over the desired sliding Window.

Even though this solution works fine, it takes a lot of time (3-4 hours) to complete the task since the DFs contain millions of rows. Is there a way to improve the computational time of such operation? I am working with Pyspark in Databricks.

My pandas UDF is:

```
@pandas_udf(FloatType(), PandasUDFType.GROUPED_AGG)
def freshness(date: pd.Series, csecid: pd.Series, analystid: pd.Series, revisions: pd.Series) -> float:
df = pd.DataFrame({
'date': date,
'csecid': csecid,
'analystid': analystid,
'revisions': revisions,
}).sort_values('date')
analysts = int(np.floor(df['analystid'].nunique() / 3))
date = df.groupby('analystid').last().sort_values('date').iloc[-analysts]['date']
df['freshness'] = np.where(df['date']<date, 0, weight)
df['fresh_revisions'] = (df['revisions'].abs()+df['freshness'])*np.sign(df['revisions'])
value = df['fresh_revisions'].sum() / df['fresh_revisions'].abs().sum()
return value
```

And is applied in:

```
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100), 0)
df = df.withColumn('new_col', freshness(F.col('date'), F.col('csecid'), F.col('analystid'), F.col('revisions_improved')).over(w))
```

Source: Windows Questions