无法在 SparkSQL 连接中解析列名
Column name cannot be resolved in SparkSQL join
我不确定为什么会这样。在 PySpark 中,我读取了两个数据帧并打印出它们的列名,它们符合预期,但是当执行 SQL 连接时,我收到一个错误,无法解析给定输入的列名。我简化了合并只是为了让它工作,但我需要添加更多的连接条件,这就是我使用 SQL 的原因(将添加:"and b.mnvr_bgn < a.idx_trip_id and b.mnvr_end > a.idx_trip_data")。似乎列 'device_id' 在 df mnvr_temp_idx_prev_temp
中被重命名为“_col7”
mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end')
print mnvr_temp_idx_prev.columns
['device_id', 'mnvr_bgn', 'mnvr_end']
raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end'))
print raw_data_filtered.columns
['device_id', 'trip_id', 'idx_trip_end']
raw_data_filtered.registerTempTable('raw_data_filtered_temp')
mnvr_temp_idx_prev.registerTempTable('mnvr_temp_idx_prev_temp')
test = sqlContext.sql('SELECT a.device_id, a.idx_trip_end, b.mnvr_bgn, b.mnvr_end \
FROM raw_data_filtered_temp as a \
INNER JOIN mnvr_temp_idx_prev_temp as b \
ON a.device_id = b.device_id')
Traceback(最后一次调用):AnalysisException: u"cannot resolve 'b.device_id' given input columns: [_col7, trip_id, device_id, mnvr_end, mnvr_bgn, idx_trip_end]; line 1 pos 237"
感谢任何帮助!
我建议至少在一个数据框中重命名字段 'device_id' 的名称。我稍微修改了您的查询并对其进行了测试(在 Scala 中)。以下查询有效
test = sqlContext.sql("select * FROM raw_data_filtered_temp a INNER JOIN mnvr_temp_idx_prev_temp b ON a.device_id = b.device_id")
[device_id: string, mnvr_bgn: string, mnvr_end: string, device_id: string, trip_id: string, idx_trip_end: string]
现在,如果您在上面的语句中执行 'select * ',它将起作用。但是如果你尝试 select 'device_id',你会得到一个错误 "Reference 'device_id' is ambiguous"。正如您在上面的 'test' 数据框定义中看到的,它有两个同名字段 (device_id)。因此,为避免这种情况,我建议更改其中一个数据框中的字段名称。
mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end')
.withColumnRenamned("device_id","device")
raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end'))
现在使用数据框或 sqlContext
//using dataframes with multiple conditions
val test = mnvr_temp_idx_prev.join(raw_data_filtered,$"device" === $"device_id"
&& $"mnvr_bgn" < $"idx_trip_id","inner")
//在SQL上下文中
test = sqlContext.sql("select * FROM raw_data_filtered_temp a INNER JOIN mnvr_temp_idx_prev_temp b ON a.device_id = b.device and a. idx_trip_id < b.mnvr_bgn")
以上查询可以解决您的问题。如果您的数据集太大,我建议不要在连接条件中使用“>”或“<”运算符,因为它会导致交叉连接,如果数据集很大,这是一项代价高昂的操作。而是在 WHERE 条件下使用它们。
我不确定为什么会这样。在 PySpark 中,我读取了两个数据帧并打印出它们的列名,它们符合预期,但是当执行 SQL 连接时,我收到一个错误,无法解析给定输入的列名。我简化了合并只是为了让它工作,但我需要添加更多的连接条件,这就是我使用 SQL 的原因(将添加:"and b.mnvr_bgn < a.idx_trip_id and b.mnvr_end > a.idx_trip_data")。似乎列 'device_id' 在 df mnvr_temp_idx_prev_temp
中被重命名为“_col7”mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end')
print mnvr_temp_idx_prev.columns
['device_id', 'mnvr_bgn', 'mnvr_end']
raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end'))
print raw_data_filtered.columns
['device_id', 'trip_id', 'idx_trip_end']
raw_data_filtered.registerTempTable('raw_data_filtered_temp')
mnvr_temp_idx_prev.registerTempTable('mnvr_temp_idx_prev_temp')
test = sqlContext.sql('SELECT a.device_id, a.idx_trip_end, b.mnvr_bgn, b.mnvr_end \
FROM raw_data_filtered_temp as a \
INNER JOIN mnvr_temp_idx_prev_temp as b \
ON a.device_id = b.device_id')
Traceback(最后一次调用):AnalysisException: u"cannot resolve 'b.device_id' given input columns: [_col7, trip_id, device_id, mnvr_end, mnvr_bgn, idx_trip_end]; line 1 pos 237"
感谢任何帮助!
我建议至少在一个数据框中重命名字段 'device_id' 的名称。我稍微修改了您的查询并对其进行了测试(在 Scala 中)。以下查询有效
test = sqlContext.sql("select * FROM raw_data_filtered_temp a INNER JOIN mnvr_temp_idx_prev_temp b ON a.device_id = b.device_id")
[device_id: string, mnvr_bgn: string, mnvr_end: string, device_id: string, trip_id: string, idx_trip_end: string]
现在,如果您在上面的语句中执行 'select * ',它将起作用。但是如果你尝试 select 'device_id',你会得到一个错误 "Reference 'device_id' is ambiguous"。正如您在上面的 'test' 数据框定义中看到的,它有两个同名字段 (device_id)。因此,为避免这种情况,我建议更改其中一个数据框中的字段名称。
mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end')
.withColumnRenamned("device_id","device")
raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end'))
现在使用数据框或 sqlContext
//using dataframes with multiple conditions
val test = mnvr_temp_idx_prev.join(raw_data_filtered,$"device" === $"device_id"
&& $"mnvr_bgn" < $"idx_trip_id","inner")
//在SQL上下文中
test = sqlContext.sql("select * FROM raw_data_filtered_temp a INNER JOIN mnvr_temp_idx_prev_temp b ON a.device_id = b.device and a. idx_trip_id < b.mnvr_bgn")
以上查询可以解决您的问题。如果您的数据集太大,我建议不要在连接条件中使用“>”或“<”运算符,因为它会导致交叉连接,如果数据集很大,这是一项代价高昂的操作。而是在 WHERE 条件下使用它们。