SPARK 1.2.0 SQLContext 需要条件值,如 oracle 中的 case 条件
SPARK 1.2.0 SQLContext need conditional value like case condition in oracle
我正在使用 spark 1.2.0 和 python。
我的问题是,在 sql 查询中,如果某个字段的值为零,我需要将其替换为其他值。
我试过适用于 1.4.0 但不适用于 1.2.0 的 case /coalese
case when COALESCE("+fld+",0)=0 then "+str(numavgnumlst[0][lock])+" else "+fld+" end.
但是对于 1.2.0,我尝试对地图做同样的事情
sc = SparkContext(appName="RunModelCCATTR")
sqlContext=SQLContext(sc)
sqlstr="select ..."
nonzerodf=sqlContext.sql(sqlstr)
.....
iifdatadf=nonzerodf.map(lambda candrow:replacezeroforrow(candrow,numavgnumlst))
....
def replacezeroforrow(rowfields,avgvalfields):
ind=0
lent=len(rowfields)
for rowfield in rowfields[4:lent]:
if rowfield==0:
rowfields[ind]=avgvalfields[ind]
ind=ind+1
return rowfields;
这会引发错误
TypeError: 'Row' object does not support item assignment
不确定我能做些什么来实现 spark 1.2.0 中的 objective。
感谢您的帮助,我认为它现在正在工作。除了列的顺序似乎已经改变之外。但这可能不是问题。
再次感谢
编辑:
这个想法对我帮助很大,需要一点修改来解决眼前的问题,-
def replacezeroforrow(rowfields,avgvalfields,dont_replace=[]):
rdict = rowfields.asDict()
return Row(dict([(k,avgvalfields[k] if v == 0 and k not in dont_replace else v ) for (k,v) in rdict.items()]))
我修改了原始解决方案以避免 'for' 处的语法错误。
调用方法如下,-
restrictdict=[FieldSet1,FieldSet2,FieldSet3,FieldSet4,modeldepvarcat[0]]
iifdatadf=nonzerodf.map(lambda candrow: replacezeroforrow(candrow,numavgnumlst[0].asDict(),restrictdict))
但是现在我正在尝试访问 iifdatadf,
frstln= iifdatadf.first()
print frstln
我有以下错误
return "<Row(%s)>" % ", ".join(self)
TypeError: sequence item 0: expected string, dict found
非常感谢帮助。
您可以使用字典代替列表,只需 return 一个新行:
def replacezeroforrow(row, avgvalfields):
rdict = row.asDict()
return Row(**{k: avgvalfields[k] if v == 0 and k in avgvalfields
else v for (k, v) in rdict.items()})
用法:
>>> r1 = Row(fld1="a", fld2=99, fld3=0, fld4=0)
>>> avgvalfields = {'fld3': 3, 'fld4': 1}
>>> replacezeroforrow(r1, avgvalfields)
Row(fld1='a', fld2=99, fld3=3, fld4=1)
我正在使用 spark 1.2.0 和 python。
我的问题是,在 sql 查询中,如果某个字段的值为零,我需要将其替换为其他值。
我试过适用于 1.4.0 但不适用于 1.2.0 的 case /coalese
case when COALESCE("+fld+",0)=0 then "+str(numavgnumlst[0][lock])+" else "+fld+" end.
但是对于 1.2.0,我尝试对地图做同样的事情
sc = SparkContext(appName="RunModelCCATTR")
sqlContext=SQLContext(sc)
sqlstr="select ..."
nonzerodf=sqlContext.sql(sqlstr)
.....
iifdatadf=nonzerodf.map(lambda candrow:replacezeroforrow(candrow,numavgnumlst))
....
def replacezeroforrow(rowfields,avgvalfields):
ind=0
lent=len(rowfields)
for rowfield in rowfields[4:lent]:
if rowfield==0:
rowfields[ind]=avgvalfields[ind]
ind=ind+1
return rowfields;
这会引发错误
TypeError: 'Row' object does not support item assignment
不确定我能做些什么来实现 spark 1.2.0 中的 objective。
感谢您的帮助,我认为它现在正在工作。除了列的顺序似乎已经改变之外。但这可能不是问题。 再次感谢
编辑:
这个想法对我帮助很大,需要一点修改来解决眼前的问题,-
def replacezeroforrow(rowfields,avgvalfields,dont_replace=[]):
rdict = rowfields.asDict()
return Row(dict([(k,avgvalfields[k] if v == 0 and k not in dont_replace else v ) for (k,v) in rdict.items()]))
我修改了原始解决方案以避免 'for' 处的语法错误。
调用方法如下,-
restrictdict=[FieldSet1,FieldSet2,FieldSet3,FieldSet4,modeldepvarcat[0]]
iifdatadf=nonzerodf.map(lambda candrow: replacezeroforrow(candrow,numavgnumlst[0].asDict(),restrictdict))
但是现在我正在尝试访问 iifdatadf,
frstln= iifdatadf.first()
print frstln
我有以下错误
return "<Row(%s)>" % ", ".join(self)
TypeError: sequence item 0: expected string, dict found
非常感谢帮助。
您可以使用字典代替列表,只需 return 一个新行:
def replacezeroforrow(row, avgvalfields):
rdict = row.asDict()
return Row(**{k: avgvalfields[k] if v == 0 and k in avgvalfields
else v for (k, v) in rdict.items()})
用法:
>>> r1 = Row(fld1="a", fld2=99, fld3=0, fld4=0)
>>> avgvalfields = {'fld3': 3, 'fld4': 1}
>>> replacezeroforrow(r1, avgvalfields)
Row(fld1='a', fld2=99, fld3=3, fld4=1)