如何在 Scala 的 Apache Flink Datastream 中转换字符串?
How to cast a string in Apache Flink Datastream in Scala?
我正在使用 Datastream API 编写一个 Scala 脚本来处理 Apache Flink 中的 csv 文件。
我需要将格式固定到某些列,然后将它们转换为正确的类型。
我目前的代码是这样的:
package org.myorg.tareac
import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction}
object BatchJob {
def main(args: Array[String]) {
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val inputPath = "file:////home/almu/Practicas_BigData/Tarea_Flink_B/RIA_exportacion_datos_diarios_Huelva_20140206.csv"
val input = env.readCsvFile[(String, String, String, String, String, String,
String, String, String, String, String, String,
String, String, String, String, String, String)](inputPath, fieldDelimiter=";", ignoreFirstLine=true)
input.print()
val casted_data = input.flatMap((IDPROVINCIA: String, SPROVINCIA: String, IDESTACION: String, SESTACION: String,
FECHA: String, ANIO: String, TEMPMAX: String, HORMINTEMPMAX: String, TEMPMIN: String,
HORMINTEMPMIN: String, TEMPMEDIA: String, HUMEDADMAX: String, HUMEDADMIN: String,
HUMEDADMEDIA: String, VELVIENTO: String, DIRVIENTO: String, RADIACION: String, PRECIPITACION: String) => {
IDESTACION.replace("\"", "").cast(Types.Int);
SESTACION.replace("\"", "");
FECHA.substring(6,9).cast(Int);
RADIACION.replace(",", ".").replace("", 0).cast(Double);
PRECIPITACION.replace(",", ".").replace("", 0).cast(Double)
})
// execute program
env.execute("Flink Batch CSV Scala Processing")
}
}
但是,当我执行 mvn clean package
时,我得到这个错误:
[ERROR] /home/almu/Practicas_BigData/Tarea_Flink_B/tareac/src/main/scala/batch/ProcessFileBatch.scala:54: error: value cast is not a member of String
[ERROR] IDESTACION.replace("\"", "").cast(Types.Int);
[ERROR] ^
[ERROR] one error found
我怎样才能正确投射?
将 .cast(Types.Int)
替换为 .toInt
。
文件内容:
Jack,12,num_123,beijing
代码:
val input = env.readCsvFile[(String, String, String, String)](inputPath, fieldDelimiter = ",", ignoreFirstLine = false)
input
.map((value: (String, String, String, String)) => {
(value._1, value._2.toInt, value._3.substring(value._3.indexOf("_") + 1).toInt)
})
.print()
结果:
(Jack,12,123)
我正在使用 Datastream API 编写一个 Scala 脚本来处理 Apache Flink 中的 csv 文件。 我需要将格式固定到某些列,然后将它们转换为正确的类型。
我目前的代码是这样的:
package org.myorg.tareac
import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction}
object BatchJob {
def main(args: Array[String]) {
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val inputPath = "file:////home/almu/Practicas_BigData/Tarea_Flink_B/RIA_exportacion_datos_diarios_Huelva_20140206.csv"
val input = env.readCsvFile[(String, String, String, String, String, String,
String, String, String, String, String, String,
String, String, String, String, String, String)](inputPath, fieldDelimiter=";", ignoreFirstLine=true)
input.print()
val casted_data = input.flatMap((IDPROVINCIA: String, SPROVINCIA: String, IDESTACION: String, SESTACION: String,
FECHA: String, ANIO: String, TEMPMAX: String, HORMINTEMPMAX: String, TEMPMIN: String,
HORMINTEMPMIN: String, TEMPMEDIA: String, HUMEDADMAX: String, HUMEDADMIN: String,
HUMEDADMEDIA: String, VELVIENTO: String, DIRVIENTO: String, RADIACION: String, PRECIPITACION: String) => {
IDESTACION.replace("\"", "").cast(Types.Int);
SESTACION.replace("\"", "");
FECHA.substring(6,9).cast(Int);
RADIACION.replace(",", ".").replace("", 0).cast(Double);
PRECIPITACION.replace(",", ".").replace("", 0).cast(Double)
})
// execute program
env.execute("Flink Batch CSV Scala Processing")
}
}
但是,当我执行 mvn clean package
时,我得到这个错误:
[ERROR] /home/almu/Practicas_BigData/Tarea_Flink_B/tareac/src/main/scala/batch/ProcessFileBatch.scala:54: error: value cast is not a member of String
[ERROR] IDESTACION.replace("\"", "").cast(Types.Int);
[ERROR] ^
[ERROR] one error found
我怎样才能正确投射?
将 .cast(Types.Int)
替换为 .toInt
。
文件内容:
Jack,12,num_123,beijing
代码:
val input = env.readCsvFile[(String, String, String, String)](inputPath, fieldDelimiter = ",", ignoreFirstLine = false)
input
.map((value: (String, String, String, String)) => {
(value._1, value._2.toInt, value._3.substring(value._3.indexOf("_") + 1).toInt)
})
.print()
结果:
(Jack,12,123)