I need to compute the cosine distance between all rows of a dataframe, for this purpose i implemented a code using cross join and an UDF to compute the cosine for all possible row permutations. This approach works as intended although kinda slow.
Searching on SO i stumbled upon an implementation that uses IndexedRowMatrix() and columnSimilarities() on the underlying RDD to achieve the same result.
Spark cosine distance between rows using Dataframe
Here's a reproducibile example:
from pyspark.mllib.linalg.distributed import RowMatrix
df_test1 = spark.createDataFrame([
(1,100 ,3),
(1,101 ,0),
(1,102 ,2),
(1,103 ,4),
(1,104, 7),
(1,105, 0),
(1,106, 0),
(2,100, 3),
(2,101, 0),
(2,102,9),
(2,103,13),
(2,104,4),
(2,105,4),
(2,106,4),
], ["ei_HashEventKey","alarmdate", "alarmcount"])
df_test1 = df_test1.groupby("ei_HashEventKey") \
.agg(F.sort_array(F.collect_list(F.struct("alarmdate", "alarmcount"))) \
.alias("collectedAlarmcount_i")) \
.withColumn("collectedAlarmcount",col("collectedAlarmcount_i.alarmcount")) \
.drop("collectedAlarmcount_i")\
print(df_test1.show(truncate=False))
df_test1=df_test1.repartition(20)
from pyspark.mllib.linalg.distributed import IndexedRowMatrix, IndexedRow
print("df partitions",df_test1.rdd.getNumPartitions())
pred_2=IndexedRowMatrix(df_test1.rdd.map(lambda x: IndexedRow(x[0],x[1])))
print("IndexedROwMAtrix partitions",pred_2.rows.getNumPartitions())
print("BlockMatrix partitions",pred_2.toBlockMatrix().blocks.getNumPartitions())
pred_2=pred_2.toBlockMatrix().transpose().toIndexedRowMatrix()
pred_sims = pred.columnSimilarities()
output:
+---------------+----------------------+
|ei_HashEventKey|collectedAlarmcount |
+---------------+----------------------+
|1 |[3, 0, 2, 4, 7, 0, 0] |
|2 |[3, 0, 9, 13, 4, 4, 4]|
+---------------+----------------------+
None
df partitions 20
IndexedROwMAtrix partitions 20
BlockMatrix partitions 1
I have an issue regarding the performance of this solution since toBlockMatrix() doesn't seem to preserve the number of partitions of the underlying RDD. I think using repartition() doesn't really do much either becouse it has to be applied on toBlockMatrix().blocks and returns RDD object which can't then be used to do columnSimilarities() since it needs a row matrix.
Doing toBlockMatrix().transpose().blocks.repartition(N).toIndexedRowMatrix() doesn't work.
With so few partitions i can't really take advantage of parallelization in the cluster.
Is there anything that can be improved about this solution? or should i stick to a crossjoin + UDF to compute the cosine similarity? thanks again