使用 scalaz.stream 连续获取数据库结果
continuously fetch database results with scalaz.stream
我是 scala 的新手,也是 scalaz 的新手。通过不同的 Whosebug 答案和一些 handholding,我能够使用 scalaz.stream 来实现一个可以连续获取 twitter API 结果的进程。现在我想对存储推特句柄的 Cassandra DB 做同样的事情。
获取推特结果的代码在这里:
def urls: Seq[(Handle,URL)] = {
Await.result(
getAll(connection).map { List =>
List.map(twitterToGet =>
(twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
)
},
5 seconds)
}
val fetchUrl = channel.lift[Task, (Handle, URL), Fetched] {
url => Task.delay {
val finalResult = callTwitter(url)
if (finalResult.tweets.nonEmpty) {
connection.updateTwitter(finalResult)
} else {
println("\n" + finalResult.handle + " does not have new tweets")
}
s"\ntwitter Fetch & database update completed"
}
}
val P = Process
val process =
(time.awakeEvery(3.second) zipWith P.emitAll(urls))((b, url) => url).
through(fetchUrl)
val fetched = process.runLog.run
fetched.foreach(println)
我打算做的是使用
def urls: Seq[(Handle,URL)] = {
连续获取 Cassandra 结果(使用 awakeEvery)并将它们发送给演员以 运行 上述推特获取代码。
我的问题是,用 scalaz.stream 实现这个的最佳方法是什么?请注意,我希望它获得所有数据库结果,然后在再次获得所有数据库结果之前有一个延迟。我应该使用与上面的推特获取代码相同的架构吗?如果是这样,我将如何创建不需要输入的 channel.lift ? scalaz.stream有没有更好的方法?
提前致谢
今天成功了。最干净的方法是将数据库结果作为流发出,并将接收器附加到流的末尾以进行 twitter 处理。我实际拥有的要复杂一些,因为它会连续检索数据库结果并将它们发送给演员以进行 Twitter 处理。检索结果的风格遵循我问题中的原始代码:
val connection = new simpleClient(conf.getString("cassandra.node"))
implicit val threadPool = new ScheduledThreadPoolExecutor(4)
val system = ActorSystem("mySystem")
val twitterFetch = system.actorOf(Props[TwitterFetch], "twitterFetch")
def myEffect = channel.lift[Task, simpleClient, String]{
connection: simpleClient => Task.delay{
val results = Await.result(
getAll(connection).map { List =>
List.map(twitterToGet =>
(twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
)
},
5 seconds)
println("Query Successful, results= " +results +" at " + format.print(System.currentTimeMillis()))
twitterFetch ! fetched(connection, results)
s"database fetch completed"
}
}
val P = Process
val process =
(time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
through(myEffect)))
val fetching = process.runLog.run
fetching.foreach(println)
一些注意事项:
我问过在没有输入的情况下使用 channel.lift,但很明显输入应该是 cassandra 连接。
行
val process =
(time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
through(myEffect)))
从 zipWith 更改为 flatMap,因为我想连续而不是一次检索结果。
我是 scala 的新手,也是 scalaz 的新手。通过不同的 Whosebug 答案和一些 handholding,我能够使用 scalaz.stream 来实现一个可以连续获取 twitter API 结果的进程。现在我想对存储推特句柄的 Cassandra DB 做同样的事情。
获取推特结果的代码在这里:
def urls: Seq[(Handle,URL)] = {
Await.result(
getAll(connection).map { List =>
List.map(twitterToGet =>
(twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
)
},
5 seconds)
}
val fetchUrl = channel.lift[Task, (Handle, URL), Fetched] {
url => Task.delay {
val finalResult = callTwitter(url)
if (finalResult.tweets.nonEmpty) {
connection.updateTwitter(finalResult)
} else {
println("\n" + finalResult.handle + " does not have new tweets")
}
s"\ntwitter Fetch & database update completed"
}
}
val P = Process
val process =
(time.awakeEvery(3.second) zipWith P.emitAll(urls))((b, url) => url).
through(fetchUrl)
val fetched = process.runLog.run
fetched.foreach(println)
我打算做的是使用
def urls: Seq[(Handle,URL)] = {
连续获取 Cassandra 结果(使用 awakeEvery)并将它们发送给演员以 运行 上述推特获取代码。
我的问题是,用 scalaz.stream 实现这个的最佳方法是什么?请注意,我希望它获得所有数据库结果,然后在再次获得所有数据库结果之前有一个延迟。我应该使用与上面的推特获取代码相同的架构吗?如果是这样,我将如何创建不需要输入的 channel.lift ? scalaz.stream有没有更好的方法?
提前致谢
今天成功了。最干净的方法是将数据库结果作为流发出,并将接收器附加到流的末尾以进行 twitter 处理。我实际拥有的要复杂一些,因为它会连续检索数据库结果并将它们发送给演员以进行 Twitter 处理。检索结果的风格遵循我问题中的原始代码:
val connection = new simpleClient(conf.getString("cassandra.node"))
implicit val threadPool = new ScheduledThreadPoolExecutor(4)
val system = ActorSystem("mySystem")
val twitterFetch = system.actorOf(Props[TwitterFetch], "twitterFetch")
def myEffect = channel.lift[Task, simpleClient, String]{
connection: simpleClient => Task.delay{
val results = Await.result(
getAll(connection).map { List =>
List.map(twitterToGet =>
(twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
)
},
5 seconds)
println("Query Successful, results= " +results +" at " + format.print(System.currentTimeMillis()))
twitterFetch ! fetched(connection, results)
s"database fetch completed"
}
}
val P = Process
val process =
(time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
through(myEffect)))
val fetching = process.runLog.run
fetching.foreach(println)
一些注意事项:
我问过在没有输入的情况下使用 channel.lift,但很明显输入应该是 cassandra 连接。
行
val process =
(time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
through(myEffect)))
从 zipWith 更改为 flatMap,因为我想连续而不是一次检索结果。