在 Pyspark 数据框中拆分输入日志文件
Splitting an input log file in Pyspark dataframe
我有一个需要使用 Pyspark Dataframe 拆分的日志文件。下面是我的示例日志文件
20/06/25 12:19:33 INFO datasources.FileScanRDD: Reading File path: hdfs://bpaiddev/dev/data/warehouse/clean/falcon/ukc/masked_data/parquet/FRAUD_CUSTOMER_INFORMATION/rcd_crt_dttm_yyyymmdd=20200523/part-0042-ed52abc2w.c000.snapp.parquet, range:0-27899, partition values :[20200523]
20/06/25 12:19:34 INFO executor.EXECUTOR: Finished task 18.0 in stage 0.0 (TID 18),18994 bytes result sent to driver
从日志示例中,您可以看到第一行比第二行包含更多详细信息。
我想要第一行 Timestamp, Status ,Message,Range,Value
列,第二行我只能有 Timestamp,Status,Message
列。
如何将正则表达式函数应用于此类数据?请帮我解决这个问题。非常感谢!
预期输出:
+-----------------+------+--------------------+--------------+--------------------+
| time_val|status| log_message| range| value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...| range:0-27899| partition values...|
|20/06/25 12:19:34| INFO|executor.EXECUTORd..| | |
+-----------------+------+--------------------+--------------+--------------------+
您可以首先加载一个包含 Timestamp
、'Status' 的 Dataframe,其余的都是 String
。
input_df=spark.createDataFrame(sc.textFile("log_lines.log").map(lambda x : tuple([x[0:17], x[18:22], x[23:]])), ["time_val","status","message"])
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
|20/06/25 12:19:34| INFO|executor.EXECUTOR...|
+-----------------+------+--------------------+
现在,您首先处理带有 Message,Range,Value
的行,如下所示,
input_df.filter(F.col("message").startswith("datasources.FileScanRDD")).withColumn("log_message", F.split(F.col("message"), ",")[0]).withColumn("range", F.split(F.col("message"), ",")[1]).withColumn("value", F.split(F.col("message"), ",")[2])..drop("message").drop("message").show()
+-----------------+------+--------------------+--------------+--------------------+
| time_val|status| log_message| range| value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...| range:0-27899| partition values...|
+-----------------+------+--------------------+--------------+--------------------+
然后你可以处理另一行只有消息,
input_df.filter(~(F.col("message").startswith("executor"))).show()
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
+-----------------+------+--------------------+
我有一个需要使用 Pyspark Dataframe 拆分的日志文件。下面是我的示例日志文件
20/06/25 12:19:33 INFO datasources.FileScanRDD: Reading File path: hdfs://bpaiddev/dev/data/warehouse/clean/falcon/ukc/masked_data/parquet/FRAUD_CUSTOMER_INFORMATION/rcd_crt_dttm_yyyymmdd=20200523/part-0042-ed52abc2w.c000.snapp.parquet, range:0-27899, partition values :[20200523]
20/06/25 12:19:34 INFO executor.EXECUTOR: Finished task 18.0 in stage 0.0 (TID 18),18994 bytes result sent to driver
从日志示例中,您可以看到第一行比第二行包含更多详细信息。
我想要第一行 Timestamp, Status ,Message,Range,Value
列,第二行我只能有 Timestamp,Status,Message
列。
如何将正则表达式函数应用于此类数据?请帮我解决这个问题。非常感谢!
预期输出:
+-----------------+------+--------------------+--------------+--------------------+
| time_val|status| log_message| range| value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...| range:0-27899| partition values...|
|20/06/25 12:19:34| INFO|executor.EXECUTORd..| | |
+-----------------+------+--------------------+--------------+--------------------+
您可以首先加载一个包含 Timestamp
、'Status' 的 Dataframe,其余的都是 String
。
input_df=spark.createDataFrame(sc.textFile("log_lines.log").map(lambda x : tuple([x[0:17], x[18:22], x[23:]])), ["time_val","status","message"])
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
|20/06/25 12:19:34| INFO|executor.EXECUTOR...|
+-----------------+------+--------------------+
现在,您首先处理带有 Message,Range,Value
的行,如下所示,
input_df.filter(F.col("message").startswith("datasources.FileScanRDD")).withColumn("log_message", F.split(F.col("message"), ",")[0]).withColumn("range", F.split(F.col("message"), ",")[1]).withColumn("value", F.split(F.col("message"), ",")[2])..drop("message").drop("message").show()
+-----------------+------+--------------------+--------------+--------------------+
| time_val|status| log_message| range| value|
+-----------------+------+--------------------+--------------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...| range:0-27899| partition values...|
+-----------------+------+--------------------+--------------+--------------------+
然后你可以处理另一行只有消息,
input_df.filter(~(F.col("message").startswith("executor"))).show()
+-----------------+------+--------------------+
| time_val|status| message|
+-----------------+------+--------------------+
|20/06/25 12:19:33| INFO|datasources.FileS...|
+-----------------+------+--------------------+