Playframework 和 Twitter 流媒体 API
Playframework and Twitter Streaming API
如何从 Twitter Streaming API - POST statuses/filter 读取响应数据?
我已建立连接并收到 200 状态码,但我不知道如何阅读推文。我只想在推文到来时打印出来。
ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
if(response.headers.status == 200)
println(response.body)
}
编辑:我找到了这个解决方案
ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
if(response.headers.status == 200){
response.body
.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
.filter(_.contains("\r\n"))
.map(json => Try(parse(json).extract[Tweet]))
.runForeach {
case Success(tweet) =>
println("-----")
println(tweet.text)
case Failure(e) =>
println("-----")
println(e.getStackTrace)
}
}
}
调用 stream()
会给你一个 Future[StreamedResponse]
。然后你必须使用 akka 习语来转换其中的 ByteString
块。类似于:
val stream = ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
stream flatMap { res =>
res.body.runWith(Sink.foreach[ByteString] { bytes =>
println(bytes.utf8String)
})
}
请注意,我没有测试上面的代码(但它基于 https://www.playframework.com/documentation/2.5.x/ScalaWS plus the sink description from http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html 的流响应部分)
另请注意,这将在其自己的行上打印每个块,我不确定 twitter API 是否会返回每个块的完整 json blob。如果你想在打印之前积累块,你可能需要使用 Sink.fold
。
流式 WS 请求的响应主体是 Akka Streams Source
of bytes. Since Twitter Api responses are newline delimited (usually) you can use Framing.delimiter
,将它们分成字节块,将块解析为 JSON,然后用它们做你想做的事。这样的事情应该有效:
import akka.stream.scaladsl.Framing
import scala.util.{Success, Try}
import akka.util.ByteString
import play.api.libs.json.{JsSuccess, Json, Reads}
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken}
case class Tweet(id: Long, text: String)
object Tweet {
implicit val reads: Reads[Tweet] = Json.reads[Tweet]
}
def twitter = Action.async { implicit request =>
ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016")
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream().flatMap { response =>
response.body
// Split up the byte stream into delimited chunks. Note
// that the chunks are quite big
.via(Framing.delimiter(ByteString.fromString("\n"), 20000))
// Parse the chunks into JSON, and then to a Tweet.
// A better parsing strategy would be to account for all
// the different possible responses, but here we just
// collect those that match a Tweet.
.map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet]))
.collect {
case Success(JsSuccess(tweet, _)) => tweet.text
}
// Print out each chunk
.runForeach(println).map { _ =>
Ok("done")
}
}
}
注意:要具体化流,您需要将隐式 Materializer
注入控制器。
如何从 Twitter Streaming API - POST statuses/filter 读取响应数据? 我已建立连接并收到 200 状态码,但我不知道如何阅读推文。我只想在推文到来时打印出来。
ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
if(response.headers.status == 200)
println(response.body)
}
编辑:我找到了这个解决方案
ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
if(response.headers.status == 200){
response.body
.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
.filter(_.contains("\r\n"))
.map(json => Try(parse(json).extract[Tweet]))
.runForeach {
case Success(tweet) =>
println("-----")
println(tweet.text)
case Failure(e) =>
println("-----")
println(e.getStackTrace)
}
}
}
调用 stream()
会给你一个 Future[StreamedResponse]
。然后你必须使用 akka 习语来转换其中的 ByteString
块。类似于:
val stream = ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
stream flatMap { res =>
res.body.runWith(Sink.foreach[ByteString] { bytes =>
println(bytes.utf8String)
})
}
请注意,我没有测试上面的代码(但它基于 https://www.playframework.com/documentation/2.5.x/ScalaWS plus the sink description from http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html 的流响应部分)
另请注意,这将在其自己的行上打印每个块,我不确定 twitter API 是否会返回每个块的完整 json blob。如果你想在打印之前积累块,你可能需要使用 Sink.fold
。
流式 WS 请求的响应主体是 Akka Streams Source
of bytes. Since Twitter Api responses are newline delimited (usually) you can use Framing.delimiter
,将它们分成字节块,将块解析为 JSON,然后用它们做你想做的事。这样的事情应该有效:
import akka.stream.scaladsl.Framing
import scala.util.{Success, Try}
import akka.util.ByteString
import play.api.libs.json.{JsSuccess, Json, Reads}
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken}
case class Tweet(id: Long, text: String)
object Tweet {
implicit val reads: Reads[Tweet] = Json.reads[Tweet]
}
def twitter = Action.async { implicit request =>
ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016")
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream().flatMap { response =>
response.body
// Split up the byte stream into delimited chunks. Note
// that the chunks are quite big
.via(Framing.delimiter(ByteString.fromString("\n"), 20000))
// Parse the chunks into JSON, and then to a Tweet.
// A better parsing strategy would be to account for all
// the different possible responses, but here we just
// collect those that match a Tweet.
.map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet]))
.collect {
case Success(JsSuccess(tweet, _)) => tweet.text
}
// Print out each chunk
.runForeach(println).map { _ =>
Ok("done")
}
}
}
注意:要具体化流,您需要将隐式 Materializer
注入控制器。