在新列上过滤 Spark DataFrame
Filtering Spark DataFrame on new column
上下文:我的数据集太大,无法放入内存,我正在训练 Keras RNN。我在 AWS EMR 集群上使用 PySpark 分批训练模型,这些模型小到足以存储在内存中。我无法使用 elephas
实现分布式模型,我怀疑这与我的模型是有状态的有关。不过我不太确定。
数据框有一行对应每个用户和从安装之日起经过的天数从 0 到 29。查询数据库后,我对数据框执行了一些操作:
query = """WITH max_days_elapsed AS (
SELECT user_id,
max(days_elapsed) as max_de
FROM table
GROUP BY user_id
)
SELECT table.*
FROM table
LEFT OUTER JOIN max_days_elapsed USING (user_id)
WHERE max_de = 1
AND days_elapsed < 1"""
df = read_from_db(query) #this is just a custom function to query our database
#Create features vector column
assembler = VectorAssembler(inputCols=features_list, outputCol="features")
df_vectorized = assembler.transform(df)
#Split users into train and test and assign batch number
udf_randint = udf(lambda x: np.random.randint(0, x), IntegerType())
training_users, testing_users = df_vectorized.select("user_id").distinct().randomSplit([0.8,0.2],123)
training_users = training_users.withColumn("batch_number", udf_randint(lit(N_BATCHES)))
#Create and sort train and test dataframes
train = df_vectorized.join(training_users, ["user_id"], "inner").select(["user_id", "days_elapsed","batch_number","features", "kpi1", "kpi2", "kpi3"])
train = train.sort(["user_id", "days_elapsed"])
test = df_vectorized.join(testing_users, ["user_id"], "inner").select(["user_id","days_elapsed","features", "kpi1", "kpi2", "kpi3"])
test = test.sort(["user_id", "days_elapsed"])
我遇到的问题是,如果不缓存列车,我似乎无法过滤 batch_number。我可以过滤我们数据库中原始数据集中的任何列,但不能过滤我在查询数据库后在 pyspark 中生成的任何列:
这个:train.filter(train["days_elapsed"] == 0).select("days_elapsed").distinct.show()
return只有 0.
但是,所有这些 return 0 到 9 之间的所有批号都没有任何过滤:
train.filter(train["batch_number"] == 0).select("batch_number").distinct().show()
train.filter(train.batch_number == 0).select("batch_number").distinct().show()
train.filter("batch_number = 0").select("batch_number").distinct().show()
train.filter(col("batch_number") == 0).select("batch_number").distinct().show()
这也不行:
train.createOrReplaceTempView("train_table")
batch_df = spark.sql("SELECT * FROM train_table WHERE batch_number = 1")
batch_df.select("batch_number").distinct().show()
如果我先执行 train.cache(),所有这些都有效。这是绝对必要的还是有没有缓存的方法?
Spark >= 2.3(?-取决于 SPARK-22629 的进度)
应该可以使用asNondeterministic
方法禁用某些优化。
Spark < 2.3
不要使用 UDF 生成随机数。首先引用the docs:
The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.
即使不是UDF,也有Spark的微妙之处,这使得在处理单个记录时几乎不可能实现这一权利。
Spark 已经提供 rand
:
Generates a random column with independent and identically distributed (i.i.d.) samples from U[0.0, 1.0].
Generates a column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.
可用于构建更复杂的生成器函数。
注意:
您的代码可能还有一些其他问题,但这从一开始就让人无法接受 (, )。
上下文:我的数据集太大,无法放入内存,我正在训练 Keras RNN。我在 AWS EMR 集群上使用 PySpark 分批训练模型,这些模型小到足以存储在内存中。我无法使用 elephas
实现分布式模型,我怀疑这与我的模型是有状态的有关。不过我不太确定。
数据框有一行对应每个用户和从安装之日起经过的天数从 0 到 29。查询数据库后,我对数据框执行了一些操作:
query = """WITH max_days_elapsed AS (
SELECT user_id,
max(days_elapsed) as max_de
FROM table
GROUP BY user_id
)
SELECT table.*
FROM table
LEFT OUTER JOIN max_days_elapsed USING (user_id)
WHERE max_de = 1
AND days_elapsed < 1"""
df = read_from_db(query) #this is just a custom function to query our database
#Create features vector column
assembler = VectorAssembler(inputCols=features_list, outputCol="features")
df_vectorized = assembler.transform(df)
#Split users into train and test and assign batch number
udf_randint = udf(lambda x: np.random.randint(0, x), IntegerType())
training_users, testing_users = df_vectorized.select("user_id").distinct().randomSplit([0.8,0.2],123)
training_users = training_users.withColumn("batch_number", udf_randint(lit(N_BATCHES)))
#Create and sort train and test dataframes
train = df_vectorized.join(training_users, ["user_id"], "inner").select(["user_id", "days_elapsed","batch_number","features", "kpi1", "kpi2", "kpi3"])
train = train.sort(["user_id", "days_elapsed"])
test = df_vectorized.join(testing_users, ["user_id"], "inner").select(["user_id","days_elapsed","features", "kpi1", "kpi2", "kpi3"])
test = test.sort(["user_id", "days_elapsed"])
我遇到的问题是,如果不缓存列车,我似乎无法过滤 batch_number。我可以过滤我们数据库中原始数据集中的任何列,但不能过滤我在查询数据库后在 pyspark 中生成的任何列:
这个:train.filter(train["days_elapsed"] == 0).select("days_elapsed").distinct.show()
return只有 0.
但是,所有这些 return 0 到 9 之间的所有批号都没有任何过滤:
train.filter(train["batch_number"] == 0).select("batch_number").distinct().show()
train.filter(train.batch_number == 0).select("batch_number").distinct().show()
train.filter("batch_number = 0").select("batch_number").distinct().show()
train.filter(col("batch_number") == 0).select("batch_number").distinct().show()
这也不行:
train.createOrReplaceTempView("train_table")
batch_df = spark.sql("SELECT * FROM train_table WHERE batch_number = 1")
batch_df.select("batch_number").distinct().show()
如果我先执行 train.cache(),所有这些都有效。这是绝对必要的还是有没有缓存的方法?
Spark >= 2.3(?-取决于 SPARK-22629 的进度)
应该可以使用asNondeterministic
方法禁用某些优化。
Spark < 2.3
不要使用 UDF 生成随机数。首先引用the docs:
The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.
即使不是UDF,也有Spark的微妙之处,这使得在处理单个记录时几乎不可能实现这一权利。
Spark 已经提供 rand
:
Generates a random column with independent and identically distributed (i.i.d.) samples from U[0.0, 1.0].
Generates a column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.
可用于构建更复杂的生成器函数。
注意:
您的代码可能还有一些其他问题,但这从一开始就让人无法接受 (