在 pyspark 中使用 UDF 和简单的数据帧
using UDF's and simpe dataframes in pyspark
我是 pyspark 的新手,来尝试做如下的事情
为每个 cookie 调用函数 PrintDetails,然后将结果写入文件。 spark.sql 查询 returns 正确的数据,我也可以将其序列化到文件中。
有人可以帮助每个 cookie 上的 for 语句吗?调用 UDF 的语法应该是什么?如何将输出写入文本文件?
感谢任何帮助。
谢谢
@udf(returnType=StringType())
def PrintDetails(cookie, timestamps,current_day, current_hourly_threshold,current_daily_threshold):
#DO SOME WORK
return "%s\t%d\t%d\t%d\t%d\t%s" %(some_data)
def main(argv):
spark = SparkSession \
.builder \
.appName("parquet_test") \
.config("spark.debug.maxToStringFields", "100") \
.getOrCreate()
inputPath = r'D:\Hadoop\Spark\parquet_input_files'
inputFiles = os.path.join(inputPath, '*.parquet')
impressionDate = datetime.strptime("2019_12_31", '%Y_%m_%d')
current_hourly_threshold = 40
current_daily_threshold = 200
parquetFile = spark.read.parquet(inputFiles)
parquetFile.createOrReplaceTempView("parquetFile")
cookie_and_time = spark.sql("SELECT cookie, collect_list(date_format(from_unixtime(ts), 'YYYY-mm-dd-H:M:S')) as imp_times FROM parquetFile group by 1 ")
for cookie in cookie_and_time :
PrintDetails(cookie('cookie'), cookie('imp_times'), impressionDate, current_hourly_threshold, current_daily_threshold))
你可以像下面那样做。
cookie_df= cookie_and_time.withColumn("cookies",PrintDetails(cookie('cookie'), cookie('imp_times'), lit(impressionDate), lit(current_hourly_threshold), lit(current_daily_threshold)))
或者您可以在 udf
函数本身中定义所有变量,避免作为参数传递。
我是 pyspark 的新手,来尝试做如下的事情 为每个 cookie 调用函数 PrintDetails,然后将结果写入文件。 spark.sql 查询 returns 正确的数据,我也可以将其序列化到文件中。 有人可以帮助每个 cookie 上的 for 语句吗?调用 UDF 的语法应该是什么?如何将输出写入文本文件?
感谢任何帮助。 谢谢
@udf(returnType=StringType())
def PrintDetails(cookie, timestamps,current_day, current_hourly_threshold,current_daily_threshold):
#DO SOME WORK
return "%s\t%d\t%d\t%d\t%d\t%s" %(some_data)
def main(argv):
spark = SparkSession \
.builder \
.appName("parquet_test") \
.config("spark.debug.maxToStringFields", "100") \
.getOrCreate()
inputPath = r'D:\Hadoop\Spark\parquet_input_files'
inputFiles = os.path.join(inputPath, '*.parquet')
impressionDate = datetime.strptime("2019_12_31", '%Y_%m_%d')
current_hourly_threshold = 40
current_daily_threshold = 200
parquetFile = spark.read.parquet(inputFiles)
parquetFile.createOrReplaceTempView("parquetFile")
cookie_and_time = spark.sql("SELECT cookie, collect_list(date_format(from_unixtime(ts), 'YYYY-mm-dd-H:M:S')) as imp_times FROM parquetFile group by 1 ")
for cookie in cookie_and_time :
PrintDetails(cookie('cookie'), cookie('imp_times'), impressionDate, current_hourly_threshold, current_daily_threshold))
你可以像下面那样做。
cookie_df= cookie_and_time.withColumn("cookies",PrintDetails(cookie('cookie'), cookie('imp_times'), lit(impressionDate), lit(current_hourly_threshold), lit(current_daily_threshold)))
或者您可以在 udf
函数本身中定义所有变量,避免作为参数传递。