如何使用 pyspark 从文本日志文件的特定部分创建数据框
How to create data frame from a particular portion of text log file using pyspark
我是 pyspark 的新手...
我有一个很大的日志文件,其中包含如下数据:
sfdfd
fsdfsdffdhfgjgfjkyklhljk,erygrt,tegtyryu,.
sgsgggggfsdf
==========================================
Roll Name class
==========================================
1 avb wer21g2
------------------------------------------
===========================================
empcode Emnname Dept Address
===========================================
12d sf sdf22 dghsjf
asf2 asdfw2 df21df fsfsfg
dsf21 sdf2 df2 sdgfsgf
-------------------------------------------
现在我想使用 Spark 和 python (Pyspark) 将这个文件分成多个 RDD/Dataframe。我可以使用 APIHadoopFile 在 Scala 中完成,现在我想在 Pyspark 中完成.谁可以帮我这个事。
加速输出为:
Roll Name clas
1 avb wer21g2
empcode Emnname Dept Address
12d sf sdf22 dghsjf
asf2 asdfw2 df21df fsfsfg
dsf21 sdf2 df2 sdgfsgf
这是我试过的代码:
with open(path) as f:
out = []
for line in f:
if line.rstrip() == findStr:
tmp = []
tmp.append(line)
for line in f:
# print(line)
if line.rstrip() == EndStr:
out.append(tmp)
break
tmp.append(line)
f.close()
SMN_df = spark.createDataFrame(tmp, StringType()).show(truncate=False)
我能够创建数据框,但没有达到预期效果output.Can任何人都可以帮助我。
更多详情请见附件截图。
数据集
from pyspark.sql import SparkSession
import re
spark=SparkSession.Builder.config("spark.sql.warehouse.dir","file://C:/temp")
.appName("SparkSQL").getOrCreate()
path="C:/Users/Rudrashis/Desktop/test2.txt"
Txtpath="L:/SparkScala/test.csv"
EndStr="---------------------------------"
FilterStr="================================="
def prepareDataset(Findstr):
with open(path) as f:
out=[]
for line in f:
if line.rstrip()==Findstr:
tmp=[]
tmp.append(re.sub("\s+",",",line.strip()))
for line in f:
if line.rstrip()==EndStr:
out.append(tmp)
break
tmp.append(re.sub("\s+",",",line.strip()))
return (tmp)
f.close()
def Makesv(Lstcommon):
with open("test.csv","w")as outfile:
for entries in map(str.strip(),Lstcommon):
outfile.write(entries)
outfile.close()
###For 1st block################
LstStudent=[]
LstStudent=prepareDataset("Roll Name Class")
LstStudent.list(filter(lambda a: a!=FilterStr,LstStudent))
createStudent=Makesv(LstStudent)
Student_DF=spark.read.format('com.databricks.spark.csv')
.options(header="true",inferschema="true").load(Txtpath)
Student_DF.show(truncate=False)
######### end 1st block####
#####2nd block start####
LstEmp=[]
LstEmp=prepareDataset("empcode Emnname Dept Address")
LstEmp.list(filter(lambda a: a!=FilterStr,LstEmp))
CreateEmp=Makesv(LstEmp)
Emp_DF=spark.read.format('com.databricks.spark.csv')
.options(header="true",inferschema="true").load(Txtpath)
Emp_DF.show(truncate=False)
##### end of 2nd block#####
我是 pyspark 的新手... 我有一个很大的日志文件,其中包含如下数据:
sfdfd
fsdfsdffdhfgjgfjkyklhljk,erygrt,tegtyryu,.
sgsgggggfsdf
==========================================
Roll Name class
==========================================
1 avb wer21g2
------------------------------------------
===========================================
empcode Emnname Dept Address
===========================================
12d sf sdf22 dghsjf
asf2 asdfw2 df21df fsfsfg
dsf21 sdf2 df2 sdgfsgf
-------------------------------------------
现在我想使用 Spark 和 python (Pyspark) 将这个文件分成多个 RDD/Dataframe。我可以使用 APIHadoopFile 在 Scala 中完成,现在我想在 Pyspark 中完成.谁可以帮我这个事。
加速输出为:
Roll Name clas
1 avb wer21g2
empcode Emnname Dept Address
12d sf sdf22 dghsjf
asf2 asdfw2 df21df fsfsfg
dsf21 sdf2 df2 sdgfsgf
这是我试过的代码:
with open(path) as f:
out = []
for line in f:
if line.rstrip() == findStr:
tmp = []
tmp.append(line)
for line in f:
# print(line)
if line.rstrip() == EndStr:
out.append(tmp)
break
tmp.append(line)
f.close()
SMN_df = spark.createDataFrame(tmp, StringType()).show(truncate=False)
我能够创建数据框,但没有达到预期效果output.Can任何人都可以帮助我。
更多详情请见附件截图。
数据集
from pyspark.sql import SparkSession
import re
spark=SparkSession.Builder.config("spark.sql.warehouse.dir","file://C:/temp")
.appName("SparkSQL").getOrCreate()
path="C:/Users/Rudrashis/Desktop/test2.txt"
Txtpath="L:/SparkScala/test.csv"
EndStr="---------------------------------"
FilterStr="================================="
def prepareDataset(Findstr):
with open(path) as f:
out=[]
for line in f:
if line.rstrip()==Findstr:
tmp=[]
tmp.append(re.sub("\s+",",",line.strip()))
for line in f:
if line.rstrip()==EndStr:
out.append(tmp)
break
tmp.append(re.sub("\s+",",",line.strip()))
return (tmp)
f.close()
def Makesv(Lstcommon):
with open("test.csv","w")as outfile:
for entries in map(str.strip(),Lstcommon):
outfile.write(entries)
outfile.close()
###For 1st block################
LstStudent=[]
LstStudent=prepareDataset("Roll Name Class")
LstStudent.list(filter(lambda a: a!=FilterStr,LstStudent))
createStudent=Makesv(LstStudent)
Student_DF=spark.read.format('com.databricks.spark.csv')
.options(header="true",inferschema="true").load(Txtpath)
Student_DF.show(truncate=False)
######### end 1st block####
#####2nd block start####
LstEmp=[]
LstEmp=prepareDataset("empcode Emnname Dept Address")
LstEmp.list(filter(lambda a: a!=FilterStr,LstEmp))
CreateEmp=Makesv(LstEmp)
Emp_DF=spark.read.format('com.databricks.spark.csv')
.options(header="true",inferschema="true").load(Txtpath)
Emp_DF.show(truncate=False)
##### end of 2nd block#####