使用 Apache Flink 连接到 Twitter Streaming API 时出现 IOExcpetion

IOExcpetion while connecting to Twitter Streaming API with Apache Flink

我写了一个 Scala 小程序,它使用 Apache Flink Streaming API 来阅读 Twitter 推文。

object TwitterWordCount {
  private val properties = "/home/twitter-login.properties"
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val twitterStream = env.addSource(new TwitterSource(properties))
    val tweets = twitterStream
      .flatMap(new JSONParseFlatMap[String, String] {
        override def flatMap(in: String, out: Collector[String]): Unit = {
          if (getString(in, "user.lang") == "en") {
            out.collect(getString(in, "text"))
          }
        }
      })
    tweets.print
    env.execute("tweets")
  }
}

执行时遇到以下问题:

14:35:48,353 INFO  com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Establishing a connection
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection request: [route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection leased: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 1 of 2; total allocated: 1 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.DefaultClientConnectionOperator - Connecting to stream.twitter.com:80
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Received message SendHeartbeat at akka://flink/user/taskmanager_1 from Actor[akka://flink/deadLetters].
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
14:35:49,487 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Handled message SendHeartbeat in 1 ms from Actor[akka://flink/deadLetters].
14:35:49,487 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) at akka://flink/user/jobmanager from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received hearbeat message from cb51cdb1bd08879df10bd2198b8e043a.
14:35:49,488 DEBUG org.apache.flink.runtime.instance.InstanceManager - Received heartbeat from TaskManager cb51cdb1bd08879df10bd2198b8e043a @ localhost - 8 slots - URL: akka://flink/user/taskmanager_1
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) in 0 ms from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d shut down
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection [id: 4][route: {}->http://stream.twitter.com] can be kept alive for 9223372036854775807 MILLISECONDS
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection released: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:52,359 WARN  com.twitter.hbc.httpclient.ClientBase - twitterSourceClient IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length
14:35:53,613 WARN  com.twitter.hbc.httpclient.ClientBase - twitterSourceClient failed to establish connection properly
14:35:53,613 INFO  com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Done processing, preparing to close connection
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager is shutting down
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager shut down

程序尝试重新建立连接。所以这 4 行日志消息继续发出。

奇怪的是,当我 运行 Apache Flink 项目中提供的 example 时,一切正常(我从 GitHub 中提取了最新版本的 master) .我什至使用相同的属性文件。如果我将该示例 class 复制到我自己的项目中,也会出现上述问题状态。

我使用 Flink 原型创建了我自己的项目。我试过 0.9.1 版和 0.10-SNAPSHOT。依赖flink-scalaflink-streaming-scalaflink-clientsflink-connector-twitter对应版本使用

有没有人遇到过类似的问题,可以帮助我走上正轨?

调试 com.twitter.hbc.httpclient.ClientBase 时出现以下异常:org.apache.http.conn.ConnectTimeoutException: Connect to stream.twitter.com:80 timed out

根据 Twitter 开发者论坛上的 post,发生这种情况是因为 Apaches HttpClient 4.2 中的错误。事实上,解决我项目上的依赖关系树表明 flink-runtime 依赖于 com.amazonaws:aws-java-sdk:1.81,它又依赖于 org.apache.httpcomponents: http客户端:4.2。

将HttpClient 4.2.6 添加到我的项目的依赖项中暂时解决了问题。

感谢@peedeeX21,您的解决方案对我有帮助! 将显式依赖项添加到 pom.xml 将解决从 eclipse 运行 宁的问题,但是当使用 flink 集群并使用 flink 运行 提交程序时 - 使用 flink 发行版打包的版本仍然获胜。

我已经通过将 httpclient-4.2.6.jar jar 下载到 flink/lib 并将其重命名为 "a" (ahttpclient-4.2.6.jar) 来解决它,所以它将首先添加到 flink 运行 的类路径中(由 bin/config.sh 完成) 希望对大家有帮助。