Project_Bank.csv 不是 Parquet 文件。尾部预期的幻数 [80, 65, 82, 49] 但发现 [110, 111, 13, 10]
Project_Bank.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [110, 111, 13, 10]
所以我试图加载推断 自定义架构 的 csv 文件,但每次我都会遇到以下错误:
Project_Bank.csv 不是 Parquet 文件。尾部预期的幻数 [80, 65, 82, 49] 但发现 [110, 111, 13, 10]
这是我的程序和我的 csv 文件条目的样子,
年龄;工作;婚姻;教育;违约;余额;住房;贷款;联系人;日;月;持续时间;活动;pdays;以前;poutcome;y
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;技术员;单身;中学;没有;29;是;没有;未知;5;可能;151;1;-1;0;未知;没有
33;企业家;已婚;中学;否;2;是;是;未知;5;可能;76;1;-1;0;未知;否
我的代码:
$spark-shell --packages com.databricks:spark-csv_2.10:1.5.0
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val bankSchema = StructType(Array(
StructField("age", IntegerType, true),
StructField("job", StringType, true),
StructField("marital", StringType, true),
StructField("education", StringType, true),
StructField("default", StringType, true),
StructField("balance", IntegerType, true),
StructField("housing", StringType, true),
StructField("loan", StringType, true),
StructField("contact", StringType, true),
StructField("day", IntegerType, true),
StructField("month", StringType, true),
StructField("duration", IntegerType, true),
StructField("campaign", IntegerType, true),
StructField("pdays", IntegerType, true),
StructField("previous", IntegerType, true),
StructField("poutcome", StringType, true),
StructField("y", StringType, true)))
val df = sqlContext.
read.
schema(bankSchema).
option("header", "true").
option("delimiter", ";").
load("/user/amit.kudnaver_gmail/hadoop/project_bank/Project_Bank.csv").toDF()
df.registerTempTable("people")
df.printSchema()
val distinctage = sqlContext.sql("select distinct age from people")
关于推送正确架构后为何无法在此处使用 csv 文件的任何建议。预先感谢您的建议。
谢谢
阿米特 K
这里的问题是 Data Frame 在处理它时需要 Parquet 文件。为了处理 CSV 中的数据。在这里你可以做什么。
首先,从数据中删除 header 行。
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
接下来我们编写如下代码读取数据。
创建案例class
case class BankSchema(age: Int, job: String, marital:String, education:String, default:String, balance:Int, housing:String, loan:String, contact:String, day:Int, month:String, duration:Int, campaign:Int, pdays:Int, previous:Int, poutcome:String, y:String)
从 HDFS 读取数据并解析
val bankData = sc.textFile("/user/myuser/Project_Bank.csv").map(_.split(";")).map(p => BankSchema(p(0).toInt, p(1), p(2),p(3),p(4), p(5).toInt, p(6), p(7), p(8), p(9).toInt, p(10), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toInt, p(15), p(16))).toDF()
然后注册table并执行查询。
bankData.registerTempTable("bankData")
val distinctage = sqlContext.sql("select distinct age from bankData")
输出结果如下
+---+
|age|
+---+
| 33|
| 44|
| 58|
+---+
此处预期的文件格式为 csv
,但根据错误,它正在寻找 parquet
文件格式。
这可以通过如下显式提及文件格式(在共享的问题中缺失)来解决,因为如果我们不指定文件格式,那么它默认需要 Parquet
格式。
根据 Java 代码版本(示例):
Dataset<Row> resultData = session.read().format("csv")
.option("sep", ",")
.option("header", true)
.option("mode", "DROPMALFORMED")
.schema(definedSchema)
.load(inputPath);
在这里,模式可以通过使用 java class (ie. POJO class)
或使用 StructType
来定义,如前所述。
而inputPath是输入csv
文件的路径。
所以我试图加载推断 自定义架构 的 csv 文件,但每次我都会遇到以下错误:
Project_Bank.csv 不是 Parquet 文件。尾部预期的幻数 [80, 65, 82, 49] 但发现 [110, 111, 13, 10]
这是我的程序和我的 csv 文件条目的样子,
年龄;工作;婚姻;教育;违约;余额;住房;贷款;联系人;日;月;持续时间;活动;pdays;以前;poutcome;y 58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no 44;技术员;单身;中学;没有;29;是;没有;未知;5;可能;151;1;-1;0;未知;没有 33;企业家;已婚;中学;否;2;是;是;未知;5;可能;76;1;-1;0;未知;否
我的代码:
$spark-shell --packages com.databricks:spark-csv_2.10:1.5.0
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val bankSchema = StructType(Array(
StructField("age", IntegerType, true),
StructField("job", StringType, true),
StructField("marital", StringType, true),
StructField("education", StringType, true),
StructField("default", StringType, true),
StructField("balance", IntegerType, true),
StructField("housing", StringType, true),
StructField("loan", StringType, true),
StructField("contact", StringType, true),
StructField("day", IntegerType, true),
StructField("month", StringType, true),
StructField("duration", IntegerType, true),
StructField("campaign", IntegerType, true),
StructField("pdays", IntegerType, true),
StructField("previous", IntegerType, true),
StructField("poutcome", StringType, true),
StructField("y", StringType, true)))
val df = sqlContext.
read.
schema(bankSchema).
option("header", "true").
option("delimiter", ";").
load("/user/amit.kudnaver_gmail/hadoop/project_bank/Project_Bank.csv").toDF()
df.registerTempTable("people")
df.printSchema()
val distinctage = sqlContext.sql("select distinct age from people")
关于推送正确架构后为何无法在此处使用 csv 文件的任何建议。预先感谢您的建议。
谢谢 阿米特 K
这里的问题是 Data Frame 在处理它时需要 Parquet 文件。为了处理 CSV 中的数据。在这里你可以做什么。
首先,从数据中删除 header 行。
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
接下来我们编写如下代码读取数据。
创建案例class
case class BankSchema(age: Int, job: String, marital:String, education:String, default:String, balance:Int, housing:String, loan:String, contact:String, day:Int, month:String, duration:Int, campaign:Int, pdays:Int, previous:Int, poutcome:String, y:String)
从 HDFS 读取数据并解析
val bankData = sc.textFile("/user/myuser/Project_Bank.csv").map(_.split(";")).map(p => BankSchema(p(0).toInt, p(1), p(2),p(3),p(4), p(5).toInt, p(6), p(7), p(8), p(9).toInt, p(10), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toInt, p(15), p(16))).toDF()
然后注册table并执行查询。
bankData.registerTempTable("bankData")
val distinctage = sqlContext.sql("select distinct age from bankData")
输出结果如下
+---+
|age|
+---+
| 33|
| 44|
| 58|
+---+
此处预期的文件格式为 csv
,但根据错误,它正在寻找 parquet
文件格式。
这可以通过如下显式提及文件格式(在共享的问题中缺失)来解决,因为如果我们不指定文件格式,那么它默认需要 Parquet
格式。
根据 Java 代码版本(示例):
Dataset<Row> resultData = session.read().format("csv")
.option("sep", ",")
.option("header", true)
.option("mode", "DROPMALFORMED")
.schema(definedSchema)
.load(inputPath);
在这里,模式可以通过使用 java class (ie. POJO class)
或使用 StructType
来定义,如前所述。
而inputPath是输入csv
文件的路径。