Akka-Http + 推特流 API

Akka-Http + Twitter streaming API

我最近对尝试 Scala 世界中的一些流媒体功能产生了兴趣。这发生在我阅读 Play 2 中的 Iteratee API 时。

然而,人们似乎认为 Iteratee API 已接近弃用,并向我推荐了以下库之一:

我真的不想进入 scalaz 世界,所以我决定看看 akka-http。

不幸的是,目前关于 akka-http 主题的文档似乎非常稀少,我在使一切正常工作时遇到了很多麻烦。

和往常一样,我选择了大家最喜欢的流数据来源:Twitter。

谷歌搜索该主题大多会导致

使用 akka-http 执行基本 GET 请求的结构似乎有点像这样:

object AkkaHttpExample extends App {

    implicit val system = ActorSystem("akka-http-example")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher 

    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("www.whosebug.com", 80)

    val request: HttpRequest = HttpRequest(
        HttpMethods.GET,
        uri = "/"
    )

    val future: Future[HttpResponse] =
        Source.single(request)
            .via(connectionFlow)
            .runWith(Sink.head)

    val result: HttpResponse = Await.result(future, 5 seconds)
}

上面的代码有效(尽管出于某种原因解析结果的主体非常烦人)。

当我尝试做同样的事情但指向 /1.1/statuses/sample.json 端点(应该发出一个示例流)时,我的 Future 只是坐在那里并超时。虽然考虑到数据的流媒体性质,这似乎合乎逻辑,但这应该 return 404,因为此时我什至没有进行适当的 OAuth。

作为参考,这是我目前的代码:

object AkkaHttpExample extends App {

    implicit val system = ActorSystem("akka-http-example")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher    

    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)

    val request: HttpRequest = HttpRequest(
        HttpMethods.GET,
        uri = "/1.1/statuses/sample.json"
    )

    val future: Future[HttpResponse] =
        Source.single(request)
            .via(connectionFlow)
            .runWith(Sink.head)

    val result: HttpResponse = Await.result(future, 50 seconds)
}

就像我说的,我认为无论如何这可能是流式传输的本质导致了问题,所以我尝试根据我能找到的唯一示例更改我的代码以逐个处理块,如图所示这里:

object AkkaHttpExample extends App {

    implicit val system = ActorSystem("akka-http-example")
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)

    val request: HttpRequest = HttpRequest(
        HttpMethods.GET,
        uri = "/1.1/statuses/sample.json"
    )

    Source.single(request)
        .via(connectionFlow)
        .map(_.entity.dataBytes)
        .flatten(FlattenStrategy.concat)
        .map(_.decodeString("UTF-8"))
        .runForeach(println _)
        .onComplete(_ => system.terminate())
}

这会导致应用程序立即终止。删除 .onComplete 子句保留 ActorSystem 和应用程序 运行,但实际上似乎什么都没有发生 :'(

有没有人有这方面的经验?到目前为止,图书馆一直是一个令人头疼的问题。我应该回到 Play + WS + Iteratees 吗?

我认为您 运行 遇到的第一个问题是错误地使用 http(端口 80)与 https(端口 443)作为您的连接协议。当我切换到 https 并通过有效的 oauth 授权 header 时,我能够让事情正常进行。我也稍微修改了您的示例以匹配最新的 http api。这是我的工作:

implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher

val connectionFlow = Http().cachedHostConnectionPoolTls[Long]("stream.twitter.com", 443)

//Note myAuth omitted here.  Also using a RawHeader because I was
//too lazy to use the real Authorization model header
val authHeader = RawHeader("Authorization", myAuth)
Source.single((request, 1l))
    .via(connectionFlow)
    .map{
      case (util.Success(resp), id) =>
        resp.entity.dataBytes
      case other =>
        println(s"Got unexpected response: $other")            
        Source.empty
     }
    .flatten(FlattenStrategy.concat)
    .map(_.decodeString("UTF-8"))
    .runForeach(println _)
    .onComplete{tr =>
      system.shutdown
    }