无法执行转换并从 Flink DataStream 和 Kafka 主题中提取 JSON 值
Not able to perform transformations and extract JSON values from Flink DataStream and Kafka Topic
我正在尝试从 Kafka 主题中读取数据,并且能够成功读取。但是,我想提取数据并将其 return 作为 Tuple
。因此,为此,我正在尝试执行 map
操作,但它不允许我通过说 cannot resolve overloaded method 'map'
来执行。下面是我的代码:
package KafkaAsSource
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties
object ReadAndValidateJSON {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
//env.enableCheckpointing(5000)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val data:DataStream[String] = getDataFromKafkaTopic(properties,env)
val mappedData: DataStream[jsonData] = data.map(v => v)
.map {
v =>
val id = v["id"]
val category = v["category"]
val eventTime = v["eventTime"]
jsonData(id,category,eventTime)
}
data.print()
env.execute("ReadAndValidateJSON")
}
def getDataFromKafkaTopic(properties: Properties,env:StreamExecutionEnvironment): DataStream[String] = {
val consumer = new FlinkKafkaConsumer[String]("maddy1", new SimpleStringSchema(), properties)
consumer.setStartFromEarliest()
val src: DataStream[String] = env.addSource(consumer)
return src
}
}
Pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
</dependencies>
Kafka 主题数据:
{
"id":"7",
"Category":"Flink",
"eventTime":"2021-12-27 20:52:58.708"
}
{
"id":"9",
"Category":"Flink",
"eventTime":"2021-12-27 20:52:58.727"
}
{
"id":"10",
"Category":"Flink",
"eventTime":"2021-12-27 20:52:58.734"
}
我究竟哪里出错了?依赖关系是否正确?我的Flink版本是1.12.2
尝试添加
import org.apache.flink.streaming.api.scala._
我正在尝试从 Kafka 主题中读取数据,并且能够成功读取。但是,我想提取数据并将其 return 作为 Tuple
。因此,为此,我正在尝试执行 map
操作,但它不允许我通过说 cannot resolve overloaded method 'map'
来执行。下面是我的代码:
package KafkaAsSource
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.util.Properties
object ReadAndValidateJSON {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
//env.enableCheckpointing(5000)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val data:DataStream[String] = getDataFromKafkaTopic(properties,env)
val mappedData: DataStream[jsonData] = data.map(v => v)
.map {
v =>
val id = v["id"]
val category = v["category"]
val eventTime = v["eventTime"]
jsonData(id,category,eventTime)
}
data.print()
env.execute("ReadAndValidateJSON")
}
def getDataFromKafkaTopic(properties: Properties,env:StreamExecutionEnvironment): DataStream[String] = {
val consumer = new FlinkKafkaConsumer[String]("maddy1", new SimpleStringSchema(), properties)
consumer.setStartFromEarliest()
val src: DataStream[String] = env.addSource(consumer)
return src
}
}
Pom.xml
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>${flink-version}</version>
</dependency>
</dependencies>
Kafka 主题数据:
{
"id":"7",
"Category":"Flink",
"eventTime":"2021-12-27 20:52:58.708"
}
{
"id":"9",
"Category":"Flink",
"eventTime":"2021-12-27 20:52:58.727"
}
{
"id":"10",
"Category":"Flink",
"eventTime":"2021-12-27 20:52:58.734"
}
我究竟哪里出错了?依赖关系是否正确?我的Flink版本是1.12.2
尝试添加
import org.apache.flink.streaming.api.scala._