PySpark,如何处理并非总是创建以避免 AnalysisException 的列?

PySpark, what to do with columns that not always get created to avoid AnalysisException?

我有批量传入的数据和几列来自旋转另一列的值,因此列数各不相同,其中一列很少收到任何数据('surprise'),

因为这个列并不总是被创建,它在 .select() 语句中,有时它会中断进程抛出和 AnalysisException。

我必须这样解决它,到目前为止它似乎工作正常,但我正在寻找更好的方法来解决这个问题,因为这看起来不像是好的代码:

try:
    df = agg_sentiment \
        .select('created_at', 'topic', 'counts', 
                'fear', 'joy', 'sadness', 'surprise', 'anger')

except Exception:
    df = agg_sentiment \
        .select('created_at', 'topic', 'counts', 
                'fear', 'joy', 'sadness', 'anger')

如您所见,我从 except 部分的 select 语句中删除了 'surprise'。 PySpark 中有没有办法处理这种情况?

我认为您可以在选择列之前检查数据框中是否存在所需的列。

这是一个例子

import pyspark.sql.functions as fx

# All wanted columns including possible missing ones
colWanted = ['created_at', 'topic', 'counts', 
                'fear', 'joy', 'sadness', 'surprise', 'anger']
colSelectPossible = []

for col in colWanted:
    if col in aggSentiment.columns:
        # Column exists, so save it to select later on
        colSelectPossible.append(col)

df = aggSentiment.select(colSelectPossible)