如何在 Python 中对 Spark Dataframe 应用任何类型的地图转换
How to apply any sort of Map Transformation on Spark Dataframe in Python
我使用的是Spark Structure Streaming,代码如下:
def convert_timestamp_to_datetime(timestamp):
return datetime.fromtimestamp(timestamp)
def extract():
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
json_schema = \
StructType() \
.add(StructField("TIMESTAMP", FloatType(), True)) \
.add(StructField("index", IntegerType(), True)) \
.add(StructField("CUSTOMER_ID", StringType(), True)) \
.add(StructField("CODE_ID", StringType(), True)) \
.add(StructField("PROCESS", StringType(), True))
my_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "simple_json_12_10trx") \
.option("startingOffsets", "earliest") \
.load()
my_df = my_df.select(from_json(col('value').cast('string'), json_schema).alias("json"))
convert_timestamp_datetime_udf = udf(lambda x: convert_timestamp_to_datetime(x), TimestampType())
return my_df.select('json.*', convert_timestamp_datetime_udf('json.TIMESTAMP').alias('DATETIME'))
def transform_load(my_df, epoch_id):
update_obj = my_df.groupBy('CUSTOMER_ID').agg(F.count('CUSTOMER_ID').alias('count_t'),F.collect_set('CODE_ID').alias('unique_CODE'))
update_obj.show()
update(update_obj)
if __name__ == '__main__':
start = time.time()
df = extract()
query = df.writeStream \
.outputMode('append')\
.foreachBatch(transform_load)\
.start() \
.awaitTermination()
我想访问分布的 Spark Dataframe 的每一行。所以,我必须使用 Map 转换。我只是添加了这个简单的代码来测试 Spark Map。但是,我没有在控制台中收到任何输出。事实上,func
不是 运行。
def func(df):
df.take(3)
def update(df):
df.rdd.map(func,preservesPartitioning=False)
你能指导我这里有什么问题吗?
非常感谢。
问题已解决。
我忘记在地图后使用 action
。另外,我不能使用 df.take(3)
,因为 func
中没有任何数据框,它是 rdd
,它没有 take
属性。我这样更改代码:
def func(x):
print(x.CUSTOMER_ID)
def update(df):
df.rdd.map(func,preservesPartitioning=False).count()
count()
是我用来查看地图结果的操作。
我使用的是Spark Structure Streaming,代码如下:
def convert_timestamp_to_datetime(timestamp):
return datetime.fromtimestamp(timestamp)
def extract():
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
json_schema = \
StructType() \
.add(StructField("TIMESTAMP", FloatType(), True)) \
.add(StructField("index", IntegerType(), True)) \
.add(StructField("CUSTOMER_ID", StringType(), True)) \
.add(StructField("CODE_ID", StringType(), True)) \
.add(StructField("PROCESS", StringType(), True))
my_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "simple_json_12_10trx") \
.option("startingOffsets", "earliest") \
.load()
my_df = my_df.select(from_json(col('value').cast('string'), json_schema).alias("json"))
convert_timestamp_datetime_udf = udf(lambda x: convert_timestamp_to_datetime(x), TimestampType())
return my_df.select('json.*', convert_timestamp_datetime_udf('json.TIMESTAMP').alias('DATETIME'))
def transform_load(my_df, epoch_id):
update_obj = my_df.groupBy('CUSTOMER_ID').agg(F.count('CUSTOMER_ID').alias('count_t'),F.collect_set('CODE_ID').alias('unique_CODE'))
update_obj.show()
update(update_obj)
if __name__ == '__main__':
start = time.time()
df = extract()
query = df.writeStream \
.outputMode('append')\
.foreachBatch(transform_load)\
.start() \
.awaitTermination()
我想访问分布的 Spark Dataframe 的每一行。所以,我必须使用 Map 转换。我只是添加了这个简单的代码来测试 Spark Map。但是,我没有在控制台中收到任何输出。事实上,func
不是 运行。
def func(df):
df.take(3)
def update(df):
df.rdd.map(func,preservesPartitioning=False)
你能指导我这里有什么问题吗?
非常感谢。
问题已解决。
我忘记在地图后使用 action
。另外,我不能使用 df.take(3)
,因为 func
中没有任何数据框,它是 rdd
,它没有 take
属性。我这样更改代码:
def func(x):
print(x.CUSTOMER_ID)
def update(df):
df.rdd.map(func,preservesPartitioning=False).count()
count()
是我用来查看地图结果的操作。