PySpark,如何处理并非总是创建以避免 AnalysisException 的列?
PySpark, what to do with columns that not always get created to avoid AnalysisException?
我有批量传入的数据和几列来自旋转另一列的值,因此列数各不相同,其中一列很少收到任何数据('surprise'),
因为这个列并不总是被创建,它在 .select()
语句中,有时它会中断进程抛出和 AnalysisException。
我必须这样解决它,到目前为止它似乎工作正常,但我正在寻找更好的方法来解决这个问题,因为这看起来不像是好的代码:
try:
df = agg_sentiment \
.select('created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'surprise', 'anger')
except Exception:
df = agg_sentiment \
.select('created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'anger')
如您所见,我从 except 部分的 select 语句中删除了 'surprise'。 PySpark 中有没有办法处理这种情况?
我认为您可以在选择列之前检查数据框中是否存在所需的列。
这是一个例子
import pyspark.sql.functions as fx
# All wanted columns including possible missing ones
colWanted = ['created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'surprise', 'anger']
colSelectPossible = []
for col in colWanted:
if col in aggSentiment.columns:
# Column exists, so save it to select later on
colSelectPossible.append(col)
df = aggSentiment.select(colSelectPossible)
我有批量传入的数据和几列来自旋转另一列的值,因此列数各不相同,其中一列很少收到任何数据('surprise'),
因为这个列并不总是被创建,它在 .select()
语句中,有时它会中断进程抛出和 AnalysisException。
我必须这样解决它,到目前为止它似乎工作正常,但我正在寻找更好的方法来解决这个问题,因为这看起来不像是好的代码:
try:
df = agg_sentiment \
.select('created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'surprise', 'anger')
except Exception:
df = agg_sentiment \
.select('created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'anger')
如您所见,我从 except 部分的 select 语句中删除了 'surprise'。 PySpark 中有没有办法处理这种情况?
我认为您可以在选择列之前检查数据框中是否存在所需的列。
这是一个例子
import pyspark.sql.functions as fx
# All wanted columns including possible missing ones
colWanted = ['created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'surprise', 'anger']
colSelectPossible = []
for col in colWanted:
if col in aggSentiment.columns:
# Column exists, so save it to select later on
colSelectPossible.append(col)
df = aggSentiment.select(colSelectPossible)