由 IndexedRowMatrix().columnSimilarities() 检索到的 PySpark 相似性不可访问:INFO ExternalSorter:Thread * spilling in-memory map

PySpark similarities retrieved by IndexedRowMatrix().columnSimilarities() are not acessible: INFO ExternalSorter: Thread * spilling in-memory map

当我运行代码:

from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from random import random
import os
from scipy.sparse import csc_matrix
import pandas as pd
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

from pyspark.sql import SQLContext

sc =SparkContext()
sqlContext = SQLContext(sc)
df = pd.read_csv("/Users/Andre/Code/blitsy-analytics/R_D/Data/cust_item_counts.csv", header=None)
customer_map = {x[1]:x[0] for x in enumerate(df[0].unique())}
item_map = {x[1]:x[0] for x in enumerate(df[1].unique())}
df[0] = df[0].map(lambda x: customer_map[x])
df[1] = df[1].map(lambda x: item_map[x])
#matrix = csc_matrix((df[2], (df[0], df[1])),shape=(max(df[0])+1, max(df[1])+1))

entries = sc.parallelize(df.apply(lambda x: tuple(x), axis=1).values)
mat = CoordinateMatrix(entries).toIndexedRowMatrix()
sim = mat.columnSimilarities()
sim.entries.map(lambda x: x).first()

我陷入线程溢出到磁盘的循环中:

> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 294
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 293
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 292
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 291
> 16/04/01 12:09:42 INFO ExternalSorter: Thread 108 spilling in-memory
> map of 137.6 MB to disk (1 time so far) 16/04/01 12:09:42 INFO
> ExternalSorter: Thread 112 spilling in-memory map of 158.1 MB to disk
> (1 time so far) 16/04/01 12:09:42 INFO ExternalSorter: Thread 114
> spilling in-memory map of 154.2 MB to disk (1 time so far) 16/04/01
> 12:09:42 INFO ExternalSorter: Thread 113 spilling in-memory map of
> 143.4 MB to disk (1 time so far)

矩阵 'mat' 的第一行条目 returns 并非如此。

这与内存管理或函数 columnSimilarity() 本身有关吗?

我在 sim 变量中有大约 86000 行和列。

我的数据集是一个元组列表(user_id、item_id、值)。我将 user_id 和 item_id 范围转换为 0 和 len(user_id| tem_id) 之间的值。这是因为 800000 的 id 不会强制矩阵那么大。

有 800,000 个此类条目。变量 'mat' 中的矩阵保存坐标 (user_id, item_id) 处元组的值。经本人验证确实如此

'mat' 处的矩阵有约 41,000 个用户和约 86,000 个项目。 Similarity 列在每个项目之间创建比较,这就是为什么它的尺寸为 86k x 86k

这一切都是在 pyspark 终端中完成的。/bin/pyspark。

正如评论中所讨论的,该问题与以下事实有关:考虑到您的集群配置,您有很多数据没有很好地分区。这就是它溢出到磁盘上的原因。

您需要为您的应用程序提供更多内存资源 and/or 增加数据分区。