如何使用 api rest 将 flink 流作为参数传递并 return 此流已转换

How to consume api rest passing flink stream as parameter and return this stream transformed

我是 apache flink 的新手。我有一个使用来自 kafka 集群的数据的 flink scala 项目,我需要将流结果作为参数传递以使用 api 这个流转换的 return 。这是我的代码

class Testing {
  def main(args: Array[String]): Unit = {}
  def streamTest(): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test).setParallelism(5)
    val api_test = "http://api-test.server.local/test/?msg=%s"
    // Here I need pass stream as parameter to api and return transformed stream
    env.execute()
  }   
}

有帮助吗?

您应该使用您熟悉的任何 http/rest 库,然后使用 asyncIO

这是我的最终代码。希望对您有所帮助

class Testing extends Serializable{
  def main(args: Array[String]): Unit = {}
  def streamTest(): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "test1.server.local:9092,test2.server.local:9092,test3.server.local:9092")
    val consumer_test = new FlinkKafkaConsumer[String]("topic_test", new SimpleStringSchema(), properties)
    consumer_test.setStartFromEarliest()
    val stream =  env.addSource(consumer_test)
    // Here I need pass stream as parameter to api and return transformed stream
    val result = stream.flatMap{
      (str, out: Collector[String]) =>
        val api_test = "http://api-test.server.local/test/?msg=%s"
        out.collect {
          getUrl(api_test.format(URLEncoder.encode(str, "UTF-8")))
        }        
    }    
    env.execute()
  }

  def getUrl(url: String): String = {
    val timeout = 5
    val config = RequestConfig.custom.setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build
    val client: CloseableHttpClient = HttpClientBuilder.create.setDefaultRequestConfig(config).build
    val request = new HttpGet(url)
    val response = client.execute(request)
    val entity = response.getEntity
    val get_result = EntityUtils.toString(entity)
    get_result
  }     
}