pig 单组按每个 reducer 的值
pig single group by value per reducer
我有一个 pig 脚本,它按如下所示对不同的客户进行分组和计数
by_customer = GROUP customer BY (start_date, spc);
cust_cnt = FOREACH by_customer {
cust = DISTINCT customer.cid;
GENERATE FLATTEN(group), COUNT(cust);
};
问题是最后一个减速器由于内存问题挂起或失败。
我可以看到分布在 reducer 之间的数据是高度倾斜的。
有没有办法分配 group by 的输出,使得每个 reducer 只得到一个分组包。
您可以对group by 使用并行语句来增加reducer 的数量。如果您的数据真的非常倾斜,那么这将无济于事,因为一个 reducer 仍然可能获得太多数据。缓解这种情况的常用方法是引入一个随机数字段并将其添加到分组依据中。
customer_random = foreach customer generate RANDOM() as seed, start_date, spc, cid, ...
by_customer = group customer_random by (seed, start_date, spc);
显然这会生成与最初不同的组,因此您需要在进一步处理时考虑到这一点(不同的计数、总和等)。使事情变得有点复杂,但应该在 reducer 之间更好地分配数据。
我解决了这个问题,方法是在不使用 distinct 关键字的情况下获得非重复计数。
by_customer = GROUP customer BY (cid,start_date,spc);
dist_customer = FOREACH by_customer GENERATE group.start_date as start_date,group.spc as spc,1 as cst_cnt;
cust = GROUP dist_customer by (start_date,spc);
cust_cnt = FOREACH cust GENERATE FLATTEN(group), SUM(dist_customer.cst_cnt);
这非常有效。
不确定为什么 distinct 不起作用。
感谢您的帮助。
我有一个 pig 脚本,它按如下所示对不同的客户进行分组和计数
by_customer = GROUP customer BY (start_date, spc);
cust_cnt = FOREACH by_customer {
cust = DISTINCT customer.cid;
GENERATE FLATTEN(group), COUNT(cust);
};
问题是最后一个减速器由于内存问题挂起或失败。 我可以看到分布在 reducer 之间的数据是高度倾斜的。 有没有办法分配 group by 的输出,使得每个 reducer 只得到一个分组包。
您可以对group by 使用并行语句来增加reducer 的数量。如果您的数据真的非常倾斜,那么这将无济于事,因为一个 reducer 仍然可能获得太多数据。缓解这种情况的常用方法是引入一个随机数字段并将其添加到分组依据中。
customer_random = foreach customer generate RANDOM() as seed, start_date, spc, cid, ...
by_customer = group customer_random by (seed, start_date, spc);
显然这会生成与最初不同的组,因此您需要在进一步处理时考虑到这一点(不同的计数、总和等)。使事情变得有点复杂,但应该在 reducer 之间更好地分配数据。
我解决了这个问题,方法是在不使用 distinct 关键字的情况下获得非重复计数。
by_customer = GROUP customer BY (cid,start_date,spc);
dist_customer = FOREACH by_customer GENERATE group.start_date as start_date,group.spc as spc,1 as cst_cnt;
cust = GROUP dist_customer by (start_date,spc);
cust_cnt = FOREACH cust GENERATE FLATTEN(group), SUM(dist_customer.cst_cnt);
这非常有效。 不确定为什么 distinct 不起作用。
感谢您的帮助。