无法将猪元组传递给 python UDF
Unable to pass pig tuple to python UDF
我有 master.txt,它有 10K 条记录,所以它的每一行都是一个元组,并且需要将整个相同的元组传递给 python UDF。由于它有多个记录,因此在存储 p2preportmap 时会出现以下错误。请帮忙
错误如下:
Unable to open iterator for alias p2preportmap. Backend error :
org.apache.pig.backend.executionengine.ExecException: ERROR 0: Scalar
has more than one row in the output. 1st : (010301,MTS,MM), 2nd
:(010B06,MTS,TN) (common cause: "JOIN" then "FOREACH ... GENERATE
foo.bar" should be "foo::bar" )
猪脚本如下:
REGISTER 'smsiuc_udf.py' using streaming_python as smsiuc_udfs;
cdrs = load '2016040111*' USING PigStorage('|','-tagFile') ;
mastergtrec = load 'master.txt' USING PigStorage(',','-tagFile');
mastergt = FOREACH mastergtrec GENERATE (chararray) UPPER() as opcdpc, (chararray) UPPER() as gtoptname,(chararray) UPPER() as gtoptcircle;
mastergttup = FOREACH mastergt generate TOTUPLE(opcdpc,gtoptname,gtoptcircle) as mstgttup;
cdrrecord = FOREACH cdrs GENERATE (chararray) UPPER() as aparty, (chararray) UPPER() as bparty, as smssentdate, as smssenttime,(=='6' ? 'S' : 'F') as status,(chararray) UPPER() as srcgt,(chararray) UPPER() as destgt,(=='405899136999995' ? 'MTSDEL-CDMA' : (=='919875089998' ? 'MTSRAJ-GSM' : (=='405899150999995' ? 'MTSCHN-CDMA' : ) ) ) as smscgt, (chararray)[=11=] as cdrfname,(chararray) as prepost;
filteredp2pcdrs = FILTER cdrrecord by smsiuc_udfs.pullp2pcdrs(aparty,bparty,srcgt,destgt) and status == 'S' and SUBSTRING(smssentdate,4,6) == '$MON';
groupp2pcdrs = GROUP filteredp2pcdrs by (srcgt,destgt,aparty,bparty,smscgt,status,prepost);
distinctp2pcdrs= FOREACH groupp2pcdrs {
uniq = DISTINCT filteredp2pcdrs.(srcgt,destgt,aparty,bparty,smscgt,status,prepost);
GENERATE FLATTEN(group),COUNT(uniq) as cnt;
};
p2preportmap = FOREACH distinctp2pcdrs GENERATE smsiuc_udfs.p2preport(srcgt,destgt,aparty,bparty,mastergttup ),smscgt,status,prepost,cnt
这可以通过添加虚拟列然后分组来完成。
dummmy= foreach p2preportmap 生成 1, $0,$1 ....
grouped = 按 $0 分组虚拟
举个例子我有两个关系A和B
一个
1,2,3
3,4,5
4,5,6
B
1
2
3
1
2
3
1
2
3
现在我想要一个 python udf 来查找 A 打印输出的第一列,如下所示。
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
所以首先我按第一列对 A 进行分组,然后按 1 对它进行分组,这样我就有了单行
c = group A by [=13=]
e = group c by 1
python udf 如下所示
def pythonudf(value,map):
print map
temp = None
for a in map:
if a[0] == value:
temp = a[1]
return value,temp
现在你使用这个 udf
D = foreach B generate myudf.pythonudf([=15=],e.);
我有 master.txt,它有 10K 条记录,所以它的每一行都是一个元组,并且需要将整个相同的元组传递给 python UDF。由于它有多个记录,因此在存储 p2preportmap 时会出现以下错误。请帮忙
错误如下:
Unable to open iterator for alias p2preportmap. Backend error : org.apache.pig.backend.executionengine.ExecException: ERROR 0: Scalar has more than one row in the output. 1st : (010301,MTS,MM), 2nd :(010B06,MTS,TN) (common cause: "JOIN" then "FOREACH ... GENERATE foo.bar" should be "foo::bar" )
猪脚本如下:
REGISTER 'smsiuc_udf.py' using streaming_python as smsiuc_udfs;
cdrs = load '2016040111*' USING PigStorage('|','-tagFile') ;
mastergtrec = load 'master.txt' USING PigStorage(',','-tagFile');
mastergt = FOREACH mastergtrec GENERATE (chararray) UPPER() as opcdpc, (chararray) UPPER() as gtoptname,(chararray) UPPER() as gtoptcircle;
mastergttup = FOREACH mastergt generate TOTUPLE(opcdpc,gtoptname,gtoptcircle) as mstgttup;
cdrrecord = FOREACH cdrs GENERATE (chararray) UPPER() as aparty, (chararray) UPPER() as bparty, as smssentdate, as smssenttime,(=='6' ? 'S' : 'F') as status,(chararray) UPPER() as srcgt,(chararray) UPPER() as destgt,(=='405899136999995' ? 'MTSDEL-CDMA' : (=='919875089998' ? 'MTSRAJ-GSM' : (=='405899150999995' ? 'MTSCHN-CDMA' : ) ) ) as smscgt, (chararray)[=11=] as cdrfname,(chararray) as prepost;
filteredp2pcdrs = FILTER cdrrecord by smsiuc_udfs.pullp2pcdrs(aparty,bparty,srcgt,destgt) and status == 'S' and SUBSTRING(smssentdate,4,6) == '$MON';
groupp2pcdrs = GROUP filteredp2pcdrs by (srcgt,destgt,aparty,bparty,smscgt,status,prepost);
distinctp2pcdrs= FOREACH groupp2pcdrs {
uniq = DISTINCT filteredp2pcdrs.(srcgt,destgt,aparty,bparty,smscgt,status,prepost);
GENERATE FLATTEN(group),COUNT(uniq) as cnt;
};
p2preportmap = FOREACH distinctp2pcdrs GENERATE smsiuc_udfs.p2preport(srcgt,destgt,aparty,bparty,mastergttup ),smscgt,status,prepost,cnt
这可以通过添加虚拟列然后分组来完成。
dummmy= foreach p2preportmap 生成 1, $0,$1 ....
grouped = 按 $0 分组虚拟
举个例子我有两个关系A和B
一个
1,2,3
3,4,5
4,5,6
B
1
2
3
1
2
3
1
2
3
现在我想要一个 python udf 来查找 A 打印输出的第一列,如下所示。
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
所以首先我按第一列对 A 进行分组,然后按 1 对它进行分组,这样我就有了单行
c = group A by [=13=]
e = group c by 1
python udf 如下所示
def pythonudf(value,map):
print map
temp = None
for a in map:
if a[0] == value:
temp = a[1]
return value,temp
现在你使用这个 udf
D = foreach B generate myudf.pythonudf([=15=],e.);