基于日期的 Pig 数据清洗
Pig data cleaning based on date
我有 2 个数据集,如下所示:
1. ID和位置
{ID, beginning year, ending year, location}.
样本:
(1001, 2010, 2012, CA)
(1001, 2013, 2015, WA)
(1002, 2009, 2015, AZ)
(1003, 2014, 2015, FL)
2。 ID和连接
{ID1, ID2, connection creating date}
样本:
(1001, 1002, 2013)
(1001, 1003, 2014)
我想根据位置和年份统计连接数。我假设一旦创建连接,它就永远不会过期。我要找的结果如下
{Location 1, Location2, year, number of connections}
在上面的例子中,应该是:
(WA, AZ,2013,1)
(WA, AZ,2014,1)
(WA, AZ,2015,1)
(WA, FL,2014,1)
(WA, FL,2015,1)
有人知道如何在 Apache pig 中实现吗?
正如您在评论中提到的,我们将在某些时候需要转向年度信息。为了最大限度地减少数据大小膨胀的影响,我们需要在 pig 脚本中将其尽可能向下移动。
我们需要做的第一件事是以下数据翻译:
{ID1, ID2, connection creating date} -> {Location1, Location2, start_year, end_year}
这可以通过以下 pig 脚本语句实现:
locationData = LOAD 'path1' USING PigStorage('\t') AS (ID:chararray, beginning_year:long, ending_year:long, location:chararray);
connectionData = LOAD 'path2' USING PigStorage('\t') AS (ID1:chararray, ID2:chararray, connection_year:long);
partialJoin = JOIN connectionData USING ID1, locationData USING ID;
partialExtracted = FOREACH partialJoin GENERATE
ID2,
connection_year,
location AS location1,
(beginning_year > connection_year ? beginning_year : connection_year) AS start_year,
ending_year AS end_year;
fullJoin = JOIN partialExtracted USING ID2, locationData USING ID;
fullExtracted = FOREACH fullJoin GENERATE,
location1,
location AS location2,
(beginning_year > start_year ? beginning_year : start_year) AS start_year,
(ending_year < end_year ? ending_year : end_year ) AS end_year;
fullFiltered = FILTER fullExtracted BY (end_year < start_year);
我们现在准备分解数据以获得年度信息。本质上,需要进行以下数据转换:
{Location1, Location2, start_year, end_year} -> {Location1, Location2, year}
e.g.
WA, AZ, 2013, 2015
->
WA, AZ, 2013
WA, AZ, 2014
WA, AZ, 2015
这里UDF是免不了的。我们需要一个包含开始年份和结束年份的 UDF 以及 returns 年份范围的包。您应该能够按照在线教程来编写您的 UDF。假设此 UDF 称为 getYearRange()。您的脚本将如下所示:
fullExploded = FOREACH fullFiltered GENERATE
location1, location2,
FLATTEN(getYearRange(start_year, end_year)) AS year;
剩下的就是 GROUP BY 以获得最终计数:
fullGrouped = GROUP fullExploded BY (location1, location2, year);
finalOutput = FOREACH fullGrouped GENERATE
FLATTEN(group) AS (location1, location2, year),
COUNT(fullExploded) AS count;
以上描述了数据流向。您可能需要添加额外的步骤来处理边缘情况并确保数据完整性。
我有 2 个数据集,如下所示:
1. ID和位置
{ID, beginning year, ending year, location}.
样本:
(1001, 2010, 2012, CA)
(1001, 2013, 2015, WA)
(1002, 2009, 2015, AZ)
(1003, 2014, 2015, FL)
2。 ID和连接
{ID1, ID2, connection creating date}
样本:
(1001, 1002, 2013)
(1001, 1003, 2014)
我想根据位置和年份统计连接数。我假设一旦创建连接,它就永远不会过期。我要找的结果如下
{Location 1, Location2, year, number of connections}
在上面的例子中,应该是:
(WA, AZ,2013,1)
(WA, AZ,2014,1)
(WA, AZ,2015,1)
(WA, FL,2014,1)
(WA, FL,2015,1)
有人知道如何在 Apache pig 中实现吗?
正如您在评论中提到的,我们将在某些时候需要转向年度信息。为了最大限度地减少数据大小膨胀的影响,我们需要在 pig 脚本中将其尽可能向下移动。 我们需要做的第一件事是以下数据翻译:
{ID1, ID2, connection creating date} -> {Location1, Location2, start_year, end_year}
这可以通过以下 pig 脚本语句实现:
locationData = LOAD 'path1' USING PigStorage('\t') AS (ID:chararray, beginning_year:long, ending_year:long, location:chararray);
connectionData = LOAD 'path2' USING PigStorage('\t') AS (ID1:chararray, ID2:chararray, connection_year:long);
partialJoin = JOIN connectionData USING ID1, locationData USING ID;
partialExtracted = FOREACH partialJoin GENERATE
ID2,
connection_year,
location AS location1,
(beginning_year > connection_year ? beginning_year : connection_year) AS start_year,
ending_year AS end_year;
fullJoin = JOIN partialExtracted USING ID2, locationData USING ID;
fullExtracted = FOREACH fullJoin GENERATE,
location1,
location AS location2,
(beginning_year > start_year ? beginning_year : start_year) AS start_year,
(ending_year < end_year ? ending_year : end_year ) AS end_year;
fullFiltered = FILTER fullExtracted BY (end_year < start_year);
我们现在准备分解数据以获得年度信息。本质上,需要进行以下数据转换:
{Location1, Location2, start_year, end_year} -> {Location1, Location2, year}
e.g.
WA, AZ, 2013, 2015
->
WA, AZ, 2013
WA, AZ, 2014
WA, AZ, 2015
这里UDF是免不了的。我们需要一个包含开始年份和结束年份的 UDF 以及 returns 年份范围的包。您应该能够按照在线教程来编写您的 UDF。假设此 UDF 称为 getYearRange()。您的脚本将如下所示:
fullExploded = FOREACH fullFiltered GENERATE
location1, location2,
FLATTEN(getYearRange(start_year, end_year)) AS year;
剩下的就是 GROUP BY 以获得最终计数:
fullGrouped = GROUP fullExploded BY (location1, location2, year);
finalOutput = FOREACH fullGrouped GENERATE
FLATTEN(group) AS (location1, location2, year),
COUNT(fullExploded) AS count;
以上描述了数据流向。您可能需要添加额外的步骤来处理边缘情况并确保数据完整性。