Akka-Http + 推特流 API
Akka-Http + Twitter streaming API
我最近对尝试 Scala 世界中的一些流媒体功能产生了兴趣。这发生在我阅读 Play 2 中的 Iteratee API 时。
然而,人们似乎认为 Iteratee API 已接近弃用,并向我推荐了以下库之一:
- scalaz-stream
- akka-streams,或者更具体地说,akka-http
我真的不想进入 scalaz 世界,所以我决定看看 akka-http。
不幸的是,目前关于 akka-http 主题的文档似乎非常稀少,我在使一切正常工作时遇到了很多麻烦。
和往常一样,我选择了大家最喜欢的流数据来源:Twitter。
谷歌搜索该主题大多会导致
- Matthias Nehlsens 在 BirdWatch project 方面的出色工作。不幸的是,他仍然使用 Iteratees。
- 人们在 Twitter 客户端上使用 akka-streams,但我不太喜欢使用它们,因为我不会真正从中学到很多东西。
使用 akka-http 执行基本 GET 请求的结构似乎有点像这样:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("www.whosebug.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/"
)
val future: Future[HttpResponse] =
Source.single(request)
.via(connectionFlow)
.runWith(Sink.head)
val result: HttpResponse = Await.result(future, 5 seconds)
}
上面的代码有效(尽管出于某种原因解析结果的主体非常烦人)。
当我尝试做同样的事情但指向 /1.1/statuses/sample.json
端点(应该发出一个示例流)时,我的 Future 只是坐在那里并超时。虽然考虑到数据的流媒体性质,这似乎合乎逻辑,但这应该 return 404,因为此时我什至没有进行适当的 OAuth。
作为参考,这是我目前的代码:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/1.1/statuses/sample.json"
)
val future: Future[HttpResponse] =
Source.single(request)
.via(connectionFlow)
.runWith(Sink.head)
val result: HttpResponse = Await.result(future, 50 seconds)
}
就像我说的,我认为无论如何这可能是流式传输的本质导致了问题,所以我尝试根据我能找到的唯一示例更改我的代码以逐个处理块,如图所示这里:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/1.1/statuses/sample.json"
)
Source.single(request)
.via(connectionFlow)
.map(_.entity.dataBytes)
.flatten(FlattenStrategy.concat)
.map(_.decodeString("UTF-8"))
.runForeach(println _)
.onComplete(_ => system.terminate())
}
这会导致应用程序立即终止。删除 .onComplete
子句保留 ActorSystem 和应用程序 运行,但实际上似乎什么都没有发生 :'(
有没有人有这方面的经验?到目前为止,图书馆一直是一个令人头疼的问题。我应该回到 Play + WS + Iteratees 吗?
我认为您 运行 遇到的第一个问题是错误地使用 http
(端口 80)与 https
(端口 443)作为您的连接协议。当我切换到 https 并通过有效的 oauth 授权 header 时,我能够让事情正常进行。我也稍微修改了您的示例以匹配最新的 http api。这是我的工作:
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow = Http().cachedHostConnectionPoolTls[Long]("stream.twitter.com", 443)
//Note myAuth omitted here. Also using a RawHeader because I was
//too lazy to use the real Authorization model header
val authHeader = RawHeader("Authorization", myAuth)
Source.single((request, 1l))
.via(connectionFlow)
.map{
case (util.Success(resp), id) =>
resp.entity.dataBytes
case other =>
println(s"Got unexpected response: $other")
Source.empty
}
.flatten(FlattenStrategy.concat)
.map(_.decodeString("UTF-8"))
.runForeach(println _)
.onComplete{tr =>
system.shutdown
}
我最近对尝试 Scala 世界中的一些流媒体功能产生了兴趣。这发生在我阅读 Play 2 中的 Iteratee API 时。
然而,人们似乎认为 Iteratee API 已接近弃用,并向我推荐了以下库之一:
- scalaz-stream
- akka-streams,或者更具体地说,akka-http
我真的不想进入 scalaz 世界,所以我决定看看 akka-http。
不幸的是,目前关于 akka-http 主题的文档似乎非常稀少,我在使一切正常工作时遇到了很多麻烦。
和往常一样,我选择了大家最喜欢的流数据来源:Twitter。
谷歌搜索该主题大多会导致
- Matthias Nehlsens 在 BirdWatch project 方面的出色工作。不幸的是,他仍然使用 Iteratees。
- 人们在 Twitter 客户端上使用 akka-streams,但我不太喜欢使用它们,因为我不会真正从中学到很多东西。
使用 akka-http 执行基本 GET 请求的结构似乎有点像这样:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("www.whosebug.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/"
)
val future: Future[HttpResponse] =
Source.single(request)
.via(connectionFlow)
.runWith(Sink.head)
val result: HttpResponse = Await.result(future, 5 seconds)
}
上面的代码有效(尽管出于某种原因解析结果的主体非常烦人)。
当我尝试做同样的事情但指向 /1.1/statuses/sample.json
端点(应该发出一个示例流)时,我的 Future 只是坐在那里并超时。虽然考虑到数据的流媒体性质,这似乎合乎逻辑,但这应该 return 404,因为此时我什至没有进行适当的 OAuth。
作为参考,这是我目前的代码:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/1.1/statuses/sample.json"
)
val future: Future[HttpResponse] =
Source.single(request)
.via(connectionFlow)
.runWith(Sink.head)
val result: HttpResponse = Await.result(future, 50 seconds)
}
就像我说的,我认为无论如何这可能是流式传输的本质导致了问题,所以我尝试根据我能找到的唯一示例更改我的代码以逐个处理块,如图所示这里:
object AkkaHttpExample extends App {
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = Http().outgoingConnection("stream.twitter.com", 80)
val request: HttpRequest = HttpRequest(
HttpMethods.GET,
uri = "/1.1/statuses/sample.json"
)
Source.single(request)
.via(connectionFlow)
.map(_.entity.dataBytes)
.flatten(FlattenStrategy.concat)
.map(_.decodeString("UTF-8"))
.runForeach(println _)
.onComplete(_ => system.terminate())
}
这会导致应用程序立即终止。删除 .onComplete
子句保留 ActorSystem 和应用程序 运行,但实际上似乎什么都没有发生 :'(
有没有人有这方面的经验?到目前为止,图书馆一直是一个令人头疼的问题。我应该回到 Play + WS + Iteratees 吗?
我认为您 运行 遇到的第一个问题是错误地使用 http
(端口 80)与 https
(端口 443)作为您的连接协议。当我切换到 https 并通过有效的 oauth 授权 header 时,我能够让事情正常进行。我也稍微修改了您的示例以匹配最新的 http api。这是我的工作:
implicit val system = ActorSystem("akka-http-example")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val connectionFlow = Http().cachedHostConnectionPoolTls[Long]("stream.twitter.com", 443)
//Note myAuth omitted here. Also using a RawHeader because I was
//too lazy to use the real Authorization model header
val authHeader = RawHeader("Authorization", myAuth)
Source.single((request, 1l))
.via(connectionFlow)
.map{
case (util.Success(resp), id) =>
resp.entity.dataBytes
case other =>
println(s"Got unexpected response: $other")
Source.empty
}
.flatten(FlattenStrategy.concat)
.map(_.decodeString("UTF-8"))
.runForeach(println _)
.onComplete{tr =>
system.shutdown
}