scala-spark:背靠背点击
scala-spark: back to back clicks
我正在学习 scala 和 spark,这两种技术都很新:
假设我有这样一个文件:
"1421453179.157" P0105451998 "SCREEN"
"1421453179.157" P0106586529 "PRESENTATION"
"1421453179.157" P0108481590 NULL
"1421453179.157" P0108481590 "SCREEN"
"1421453179.157" P0112397365 "FULL_SCREEN"
"1421453179.157" P0113994553 "FULL_SCREEN"
"1421453179.158" P0112360870 "DATA_INFO" dataId:5913974361807341112
"1421453179.159" P0112360870 "DATA_INFO" dataId:7658923479992321112
"1421453179.160" P0108137271 "SCREEN"
"1421453179.161" P0103681986 "SCREEN"
"1421453179.162" P0104229251 "PRESENTATION"
第一列是时间,第二列是user_id,第三列的含义取决于第四列中的数据。
我想完成以下任务:
我想找到连续的 DATA_INFO 条记录并生成以下
P0112360870, 5913974361807341112|7658923479992321112
该行的口头解释是用户 P0112360870
点击 5913974361807341112|7658923479992321112
第一次点击应该在开头这里 5913974361807341112 是第一次点击。
我从以下开始:
val data=sc.textFile("hdfs://*").map(line=> {val tks=line.split("\t",3); (tks(1),(tks(0),tks(2))) } )
val data2=data.groupBy( a=> a._1).take(1000)
但是想不出从这里往前走。
val data=sc.textFile("hdfs://*").map( line => line.split( "\t" ).toList )
// you probably want only those with pxx with at least some data.
val filteredData = data.filter( l => l.length > 3 )
val groupedData = data.groupBy( l => l( 1 ) )
val iWantedThis = groupedData.map( ( pxxx, iterOfList ) => {
// every pxxxx group will have at least one entry with data.
val firstData = iterOfList.head( 3 )
// Now concatenate all other data's to the firstdata
val datas = iterOfList.tail.foldLeft( firstData )( ( fd, l ) => fd + "|" + l( 3 ) )
// return the string with \t as separtor.
List( pxxx, datas ).mkString( "\t" )
} )
我认为你处理这件事的方式一开始就错了。如果您知道自己的密钥,则将其设置为具有以下内容的正确键值元组:
sc.textFile("hdfs://*")
.map(_.split("\t",3)) //Split on tabs
.map(tks=>(tks(1),(tks(0),tks(2)))) //Create a (key, Tuple2) pairing
.reduceByKey(
(x,y)=>
if(x._1 contains "DATA_INFO") (s"${x._2}|${y._2}".replace("dataId:",""), "")
else x //Ignore duplicate non-DATA_INFO elements by dropping?????
)
需要注意的最重要的事情是,您需要处理 else 情况,但是是适当的。
根据要求进行澄清
(s"${x._2}|${y._2}".replace("dataId:",""), "") //Using string interpolation
与
相同
val concatenatedString = x._2 +"|"+y._2
val concatStringWithoutMetaData = concatenatedString.replace("dataId:","")
(concatStringWithoutMetaData, "") //Return the new string with an empty final column
使用 spark-shell
(就像 Spark 的 REPL)来测试您的想法通常很有用。特别是当你是新手时。
运行 spark shell(在 bin/spark-shell
中),并创建您的测试数据集:
val input = """
"1421453179.157" P0105451998 "SCREEN"
"1421453179.157" P0106586529 "PRESENTATION"
"1421453179.157" P0108481590 NULL
"1421453179.157" P0108481590 "SCREEN"
"1421453179.157" P0112397365 "FULL_SCREEN"
"1421453179.157" P0113994553 "FULL_SCREEN"
"1421453179.158" P0112360870 "DATA_INFO" dataId:5913974361807341112
"1421453179.159" P0112360870 "DATA_INFO" dataId:7658923479992321112
"1421453179.160" P0108137271 "SCREEN"
"1421453179.161" P0103681986 "SCREEN"
"1421453179.162" P0104229251 "PRESENTATION""""
sc.parallelize(input.split("\n").map(_.trim)).map(_.split("\s+")).
filter(_.length > 3). // take only > 3 (so containing dataId)
map(a => a(1) -> a(3).split(":")(1) ). // create a pair for each row your user -> click
reduceByKey(_ + "|" + _). // reduce clicks per user
collect // get it to the driver
当你 运行 它时,你应该或多或少看到这个:
res0: Array[(String, String)] = Array((P0112360870,5913974361807341112|7658923479992321112))
我认为这就是您要找的。
我正在学习 scala 和 spark,这两种技术都很新:
假设我有这样一个文件:
"1421453179.157" P0105451998 "SCREEN"
"1421453179.157" P0106586529 "PRESENTATION"
"1421453179.157" P0108481590 NULL
"1421453179.157" P0108481590 "SCREEN"
"1421453179.157" P0112397365 "FULL_SCREEN"
"1421453179.157" P0113994553 "FULL_SCREEN"
"1421453179.158" P0112360870 "DATA_INFO" dataId:5913974361807341112
"1421453179.159" P0112360870 "DATA_INFO" dataId:7658923479992321112
"1421453179.160" P0108137271 "SCREEN"
"1421453179.161" P0103681986 "SCREEN"
"1421453179.162" P0104229251 "PRESENTATION"
第一列是时间,第二列是user_id,第三列的含义取决于第四列中的数据。
我想完成以下任务:
我想找到连续的 DATA_INFO 条记录并生成以下
P0112360870, 5913974361807341112|7658923479992321112
该行的口头解释是用户 P0112360870
点击 5913974361807341112|7658923479992321112
第一次点击应该在开头这里 5913974361807341112 是第一次点击。
我从以下开始:
val data=sc.textFile("hdfs://*").map(line=> {val tks=line.split("\t",3); (tks(1),(tks(0),tks(2))) } )
val data2=data.groupBy( a=> a._1).take(1000)
但是想不出从这里往前走。
val data=sc.textFile("hdfs://*").map( line => line.split( "\t" ).toList )
// you probably want only those with pxx with at least some data.
val filteredData = data.filter( l => l.length > 3 )
val groupedData = data.groupBy( l => l( 1 ) )
val iWantedThis = groupedData.map( ( pxxx, iterOfList ) => {
// every pxxxx group will have at least one entry with data.
val firstData = iterOfList.head( 3 )
// Now concatenate all other data's to the firstdata
val datas = iterOfList.tail.foldLeft( firstData )( ( fd, l ) => fd + "|" + l( 3 ) )
// return the string with \t as separtor.
List( pxxx, datas ).mkString( "\t" )
} )
我认为你处理这件事的方式一开始就错了。如果您知道自己的密钥,则将其设置为具有以下内容的正确键值元组:
sc.textFile("hdfs://*")
.map(_.split("\t",3)) //Split on tabs
.map(tks=>(tks(1),(tks(0),tks(2)))) //Create a (key, Tuple2) pairing
.reduceByKey(
(x,y)=>
if(x._1 contains "DATA_INFO") (s"${x._2}|${y._2}".replace("dataId:",""), "")
else x //Ignore duplicate non-DATA_INFO elements by dropping?????
)
需要注意的最重要的事情是,您需要处理 else 情况,但是是适当的。
根据要求进行澄清
(s"${x._2}|${y._2}".replace("dataId:",""), "") //Using string interpolation
与
相同val concatenatedString = x._2 +"|"+y._2
val concatStringWithoutMetaData = concatenatedString.replace("dataId:","")
(concatStringWithoutMetaData, "") //Return the new string with an empty final column
使用 spark-shell
(就像 Spark 的 REPL)来测试您的想法通常很有用。特别是当你是新手时。
运行 spark shell(在 bin/spark-shell
中),并创建您的测试数据集:
val input = """
"1421453179.157" P0105451998 "SCREEN"
"1421453179.157" P0106586529 "PRESENTATION"
"1421453179.157" P0108481590 NULL
"1421453179.157" P0108481590 "SCREEN"
"1421453179.157" P0112397365 "FULL_SCREEN"
"1421453179.157" P0113994553 "FULL_SCREEN"
"1421453179.158" P0112360870 "DATA_INFO" dataId:5913974361807341112
"1421453179.159" P0112360870 "DATA_INFO" dataId:7658923479992321112
"1421453179.160" P0108137271 "SCREEN"
"1421453179.161" P0103681986 "SCREEN"
"1421453179.162" P0104229251 "PRESENTATION""""
sc.parallelize(input.split("\n").map(_.trim)).map(_.split("\s+")).
filter(_.length > 3). // take only > 3 (so containing dataId)
map(a => a(1) -> a(3).split(":")(1) ). // create a pair for each row your user -> click
reduceByKey(_ + "|" + _). // reduce clicks per user
collect // get it to the driver
当你 运行 它时,你应该或多或少看到这个:
res0: Array[(String, String)] = Array((P0112360870,5913974361807341112|7658923479992321112))
我认为这就是您要找的。