在spark scala中为udf定义输入参数
define input parameters for udf in spark scala
我在 scala 和 spark 工作
我正在定义一个 udf ,这里是
def udfcrpentcd = udf((CORP_ENT_CD:String)=>{
state_name match{
case "IL1" if state_name.contains("IL1")=> "IL1"
case "OK1" if state_name.contains("OK1")=> "OK1"
case "TX1" if state_name.contains("TX1")=> "TX1"
case "NM1" if state_name.contains("NM1")=> "NM1"
case "MT1" if state_name.contains("MT1")=> "MT1"
case _ =>"Null"
}})
val local_masterdb =old_dataframe_temp_masterdbDataFrame.withColumn("new_columna_name_CORP_ENT_CD",udfcrpentcd(old_dataframe_temp_masterdbDataFrame("last_column_of_old_dataframe_DB_STATUS")+1))
local_masterdb.show()
现在,我想重用上面的 udf,
我想让它通用,而不是比较 state_name ,我需要传递一个字符串然后它 returns CRP_ENT_CD ...这就是我想要做的.
这样对吗....
def udfcrpentcd (input_parameter:String) = udf((CORP_ENT_CD:String)=>{
input_parameter match{
case "IL1" if input_parameter.contains("IL1")=> "IL1"
case "OK1" if input_parameter.contains("OK1")=> "OK1"
case "TX1" if input_parameter.contains("TX1")=> "TX1"
case "NM1" if input_parameter.contains("NM1")=> "NM1"
case "MT1" if input_parameter.contains("MT1")=> "MT1"
case _ =>"Null"
}})
如果这是正确的方式那么如何回调呢?
关于传递参数的任何帮助
这是一个如何将参数传递给 udf 的示例。
val udfcrpentcd_res = udf(udfcrpentcd)
def udfcrpentcd (String => String) = (input_parameter: String) =>{
input_parameter match{
case "IL1" if input_parameter.contains("IL1")=> "IL1"
case "OK1" if input_parameter.contains("OK1")=> "OK1"
case "TX1" if input_parameter.contains("TX1")=> "TX1"
case "NM1" if input_parameter.contains("NM1")=> "NM1"
case "MT1" if input_parameter.contains("MT1")=> "MT1"
case _ =>"Null"
}})
val local_masterdb = old_dataframe_temp_masterdbDataFrame.withColumn("new_columna_name_CORP_ENT_CD",udfcrpentcd_res(old_dataframe_temp_masterdbDataFrame("last_column_of_old_dataframe_DB_STATUS")+1))
local_masterdb.show()
我在 scala 和 spark 工作
我正在定义一个 udf ,这里是
def udfcrpentcd = udf((CORP_ENT_CD:String)=>{
state_name match{
case "IL1" if state_name.contains("IL1")=> "IL1"
case "OK1" if state_name.contains("OK1")=> "OK1"
case "TX1" if state_name.contains("TX1")=> "TX1"
case "NM1" if state_name.contains("NM1")=> "NM1"
case "MT1" if state_name.contains("MT1")=> "MT1"
case _ =>"Null"
}})
val local_masterdb =old_dataframe_temp_masterdbDataFrame.withColumn("new_columna_name_CORP_ENT_CD",udfcrpentcd(old_dataframe_temp_masterdbDataFrame("last_column_of_old_dataframe_DB_STATUS")+1))
local_masterdb.show()
现在,我想重用上面的 udf,
我想让它通用,而不是比较 state_name ,我需要传递一个字符串然后它 returns CRP_ENT_CD ...这就是我想要做的.
这样对吗....
def udfcrpentcd (input_parameter:String) = udf((CORP_ENT_CD:String)=>{
input_parameter match{
case "IL1" if input_parameter.contains("IL1")=> "IL1"
case "OK1" if input_parameter.contains("OK1")=> "OK1"
case "TX1" if input_parameter.contains("TX1")=> "TX1"
case "NM1" if input_parameter.contains("NM1")=> "NM1"
case "MT1" if input_parameter.contains("MT1")=> "MT1"
case _ =>"Null"
}})
如果这是正确的方式那么如何回调呢? 关于传递参数的任何帮助
这是一个如何将参数传递给 udf 的示例。
val udfcrpentcd_res = udf(udfcrpentcd)
def udfcrpentcd (String => String) = (input_parameter: String) =>{
input_parameter match{
case "IL1" if input_parameter.contains("IL1")=> "IL1"
case "OK1" if input_parameter.contains("OK1")=> "OK1"
case "TX1" if input_parameter.contains("TX1")=> "TX1"
case "NM1" if input_parameter.contains("NM1")=> "NM1"
case "MT1" if input_parameter.contains("MT1")=> "MT1"
case _ =>"Null"
}})
val local_masterdb = old_dataframe_temp_masterdbDataFrame.withColumn("new_columna_name_CORP_ENT_CD",udfcrpentcd_res(old_dataframe_temp_masterdbDataFrame("last_column_of_old_dataframe_DB_STATUS")+1))
local_masterdb.show()