使用 .net-spark 加载包含多个部分的固定位置文件
Load fixed position file with multiple sections using .net-spark
我正在尝试使用 .net-spark 在 spark 中加载包含多个部分的 fixed-position 文件。
这是文件的示例:
01Nikola Tesla tesla@gmail.com +5521981181569
02Creations
03Alternating current
03Tesla coil
01Thomas Edison edison@gmail.com +5521981181569
02Creations
03Lamp
03Phonograph
03General Eletric
03Cinema
所以基本上我们有一个 header 与交易所有者,一个 sub-header 说明下面是交易,最后是交易部分。交易行不包含对所有者的任何引用,所以是的,这很棘手。
正如@EdElliott 所建议的,这是我们应该如何在 RDD 中查看数据(仅显示第一行,但目的是阅读所有内容):
inventor
email
phone
creations
Nikola Tesla
tesla@gmail.com
+5511999999999
Alternating current
我想这种文件格式现在不是很常见,但在巴西大银行中仍然很常见。
找到 this example for java,但它不处理节部分。我相信我可以使用 UDF 实现这一点,但同样,我不知道从哪里开始。感谢这里的任何见解。
谢谢
我通过了另一边。对解决方案并不感到自豪,主要是因为我正在使用 ToLocalIterator() 以及我将参数传递给 normalizedTransaction UDF 的方式。谢谢@EdElliott,your blog 帮了我很多。
无论如何,就这样:
static Func<Column, Column> OnlyHeaders = Udf<string, bool>(
line => line.Substring(0,2).Equals("01")
);
static Func<Column, Column> OnlyTransactions = Udf<string, bool>(
line => line.Substring(0,2).Equals("03")
);
static Func<Column, Column> breakHeader =
Udf<string, string[]>((line) => GetHeader(line) );
static Func<Column, Column> breakTransaction =
Udf<string, string[]>((line) => GetTransaction(line) );
static void Main(string[] args)
{
// Create a Spark session
var spark = SparkSession
.Builder()
.AppName("FixedLenghtWithSectionsApp")
.GetOrCreate();
// Create initial DataFrame
var rawDf = spark.Read().Schema("rawLine STRING").Text("resources/input.txt");
rawDf = rawDf.WithColumn("rowNumber", Functions.MonotonicallyIncreasingId());
rawDf.CreateOrReplaceTempView ("rawdata");
var headersDf = GetHeadersDf(rawDf);
headersDf.CreateOrReplaceTempView("headers");
headersDf.Show();
var transactionsDf = GetTransactionsDf(rawDf);
transactionsDf.CreateOrReplaceTempView("transactions");
transactionsDf.Show();
var headerLines = headersDf.ToLocalIterator().ToList().Select( r => r.Get("rowNumber"));
var columns = new StringBuilder();
foreach(var h in headerLines)
columns.Append($"{h.ToString()},");
var column = Functions.Lit(columns.ToString()).Alias("ids");
Func<Column, Column, Column> normalizedTransaction = Udf<string, string,int>( (line, hLines) => {
var ids = hLines.Split(",", StringSplitOptions.RemoveEmptyEntries).Select(s => int.Parse(s));
var id = ids.Where(h => h < int.Parse(line) );
if (id.Any())
return id.Max();
return -1;
} );
var inventionsDf = transactionsDf.Alias("one")
.Select(
Functions.Col("one.rowNumber"),
Functions.Col("invention"),
normalizedTransaction(Functions.Col("one.rowNumber").Cast("string") , column ).Alias("id")
);
inventionsDf = inventionsDf.Alias("one")
.Join(
headersDf.Alias("two")
,Functions.Col("one.id") == Functions.Col("two.rowNumber")
,"inner"
);
inventionsDf.Show();
spark.Stop();
}
private static DataFrame GetTransactionsDf(DataFrame rawDf)
{
var transactionsRawsDf = rawDf
.Select(rawDf["rowNumber"],breakTransaction(rawDf["rawLine"]).Alias("value"))
.Where(OnlyTransactions(rawDf["rawLine"]));
return transactionsRawsDf.Select(
transactionsRawsDf["rowNumber"],
transactionsRawsDf.Col("value").GetItem(0))
.ToDF("rowNumber", "invention");
}
private static DataFrame GetHeadersDf(DataFrame rawDf)
{
var headerRawsDf = rawDf
.Select(rawDf["rowNumber"],breakHeader(rawDf["rawLine"]).Alias("value"))
.Where(OnlyHeaders(rawDf["rawLine"]));
return headerRawsDf.Select(
headerRawsDf["rowNumber"],
headerRawsDf.Col("value").GetItem(0),
headerRawsDf.Col("value").GetItem(1),
headerRawsDf.Col("value").GetItem(2))
.ToDF("rowNumber", "inventor", "email", "phone");
}
private static string[] GetHeader(string line)
{
var columns = new List<string>();
columns.Add(line.Substring(2,30));
columns.Add(line.Substring(32,70));
columns.Add(line.Substring(102,14));
return columns.ToArray();
}
private static string[] GetTransaction(string line)
{
var columns = new List<string>();
columns.Add(line.Substring(2,48));
return columns.ToArray();
}
我正在尝试使用 .net-spark 在 spark 中加载包含多个部分的 fixed-position 文件。 这是文件的示例:
01Nikola Tesla tesla@gmail.com +5521981181569
02Creations
03Alternating current
03Tesla coil
01Thomas Edison edison@gmail.com +5521981181569
02Creations
03Lamp
03Phonograph
03General Eletric
03Cinema
所以基本上我们有一个 header 与交易所有者,一个 sub-header 说明下面是交易,最后是交易部分。交易行不包含对所有者的任何引用,所以是的,这很棘手。
正如@EdElliott 所建议的,这是我们应该如何在 RDD 中查看数据(仅显示第一行,但目的是阅读所有内容):
inventor | phone | creations | |
---|---|---|---|
Nikola Tesla | tesla@gmail.com | +5511999999999 | Alternating current |
我想这种文件格式现在不是很常见,但在巴西大银行中仍然很常见。
找到 this example for java,但它不处理节部分。我相信我可以使用 UDF 实现这一点,但同样,我不知道从哪里开始。感谢这里的任何见解。
谢谢
我通过了另一边。对解决方案并不感到自豪,主要是因为我正在使用 ToLocalIterator() 以及我将参数传递给 normalizedTransaction UDF 的方式。谢谢@EdElliott,your blog 帮了我很多。
无论如何,就这样:
static Func<Column, Column> OnlyHeaders = Udf<string, bool>(
line => line.Substring(0,2).Equals("01")
);
static Func<Column, Column> OnlyTransactions = Udf<string, bool>(
line => line.Substring(0,2).Equals("03")
);
static Func<Column, Column> breakHeader =
Udf<string, string[]>((line) => GetHeader(line) );
static Func<Column, Column> breakTransaction =
Udf<string, string[]>((line) => GetTransaction(line) );
static void Main(string[] args)
{
// Create a Spark session
var spark = SparkSession
.Builder()
.AppName("FixedLenghtWithSectionsApp")
.GetOrCreate();
// Create initial DataFrame
var rawDf = spark.Read().Schema("rawLine STRING").Text("resources/input.txt");
rawDf = rawDf.WithColumn("rowNumber", Functions.MonotonicallyIncreasingId());
rawDf.CreateOrReplaceTempView ("rawdata");
var headersDf = GetHeadersDf(rawDf);
headersDf.CreateOrReplaceTempView("headers");
headersDf.Show();
var transactionsDf = GetTransactionsDf(rawDf);
transactionsDf.CreateOrReplaceTempView("transactions");
transactionsDf.Show();
var headerLines = headersDf.ToLocalIterator().ToList().Select( r => r.Get("rowNumber"));
var columns = new StringBuilder();
foreach(var h in headerLines)
columns.Append($"{h.ToString()},");
var column = Functions.Lit(columns.ToString()).Alias("ids");
Func<Column, Column, Column> normalizedTransaction = Udf<string, string,int>( (line, hLines) => {
var ids = hLines.Split(",", StringSplitOptions.RemoveEmptyEntries).Select(s => int.Parse(s));
var id = ids.Where(h => h < int.Parse(line) );
if (id.Any())
return id.Max();
return -1;
} );
var inventionsDf = transactionsDf.Alias("one")
.Select(
Functions.Col("one.rowNumber"),
Functions.Col("invention"),
normalizedTransaction(Functions.Col("one.rowNumber").Cast("string") , column ).Alias("id")
);
inventionsDf = inventionsDf.Alias("one")
.Join(
headersDf.Alias("two")
,Functions.Col("one.id") == Functions.Col("two.rowNumber")
,"inner"
);
inventionsDf.Show();
spark.Stop();
}
private static DataFrame GetTransactionsDf(DataFrame rawDf)
{
var transactionsRawsDf = rawDf
.Select(rawDf["rowNumber"],breakTransaction(rawDf["rawLine"]).Alias("value"))
.Where(OnlyTransactions(rawDf["rawLine"]));
return transactionsRawsDf.Select(
transactionsRawsDf["rowNumber"],
transactionsRawsDf.Col("value").GetItem(0))
.ToDF("rowNumber", "invention");
}
private static DataFrame GetHeadersDf(DataFrame rawDf)
{
var headerRawsDf = rawDf
.Select(rawDf["rowNumber"],breakHeader(rawDf["rawLine"]).Alias("value"))
.Where(OnlyHeaders(rawDf["rawLine"]));
return headerRawsDf.Select(
headerRawsDf["rowNumber"],
headerRawsDf.Col("value").GetItem(0),
headerRawsDf.Col("value").GetItem(1),
headerRawsDf.Col("value").GetItem(2))
.ToDF("rowNumber", "inventor", "email", "phone");
}
private static string[] GetHeader(string line)
{
var columns = new List<string>();
columns.Add(line.Substring(2,30));
columns.Add(line.Substring(32,70));
columns.Add(line.Substring(102,14));
return columns.ToArray();
}
private static string[] GetTransaction(string line)
{
var columns = new List<string>();
columns.Add(line.Substring(2,48));
return columns.ToArray();
}