是否可以使用 AkkaHttp、AkkaStreams、Alpakka 和数据库构建 OLTP/CRUD HTTP 服务器?
Is it possible to build an OLTP/CRUD HTTP server using AkkaHttp, AkkaStreams, Alpakka and a database?
我很清楚使用 Actors 当然是可能的:例如 https://github.com/chbatey/akka-http-typed.git 正在使用 AkkaHttp 和类型化的 actor。
但我不清楚是否只使用 AkkaStreams 及其 Alpakka 连接器库(包括数据库),是否可以进行常规的 CRUD / OLTP 服务,或者只是将数据从一个数据库复制到另一个数据库,或其他OLAP/批处理/流处理场景
如果您知道如何做到这一点,请指出一些细节,如果您能在 github 上提供示例,那就太好了。
我认为可能的方式是服务器涉及两个对话/有状态流转换:一个通过 HTTP 与外部世界,一个与数据库。我不确定是否可以这样建模。
https://doc.akka.io/docs/alpakka/current/slick.html 似乎既提供 UPDATE/INSERTS 作为接收器,又将 SELECT 指向某个 id 作为源。您是否知道是否有示例应用程序,或者您能否广泛提及如何使用 Akka Http 进行连接?
是的,基本上在 AkkaHttp 中收到的每个请求,我们都会创建一个 AkkaStreams Graph(通常只是一个管道),基本上只是来自数据库的 Slick Alpakka Source,可能有一些运算符作为前缀,然后在 AkkaHttp 中返回,这当然支持Source。更多详细信息,请访问 [https://www.quora.com/Is-it-possible-to-build-an-OLTP-CRUD-HTTP-server-using-Akka-HTTP-Akka-Streams-Alpakka-and-a-database-Do-you-know-any-examples-of-code-on-GitHub-or-elsewhere/answer/Nicolae-Marasoiu]
我放了一个demo,希望对你有帮助。
正在创建 table,数据库是 mysql。
CREATE TABLE test(id VARCHAR(32))
sbt:
"com.lightbend.akka" %% "akka-stream-alpakka-slick" % "1.1.0",
"mysql" % "mysql-connector-java" % "5.1.40"
代码:
package tech.parasol.scala.crud
import java.sql.SQLException
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{complete, get, path, _}
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.io.StdIn
import scala.util.{Failure, Success}
object CrudTest1 {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("CrudTest1")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val hostName = "120.0.0.1"
val rocketDbConfig =
s"""
|db-config {
| profile = "slick.jdbc.MySQLProfile$$"
| db {
| dataSourceClass = "slick.jdbc.DriverDataSource"
| properties = {
| driver = "com.mysql.jdbc.Driver"
| url = "jdbc:mysql://${hostName}:3306/rocket?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false"
| user = "root"
| password = "passw0rd"
| }
| }
|}
|
""".stripMargin
implicit val session = SlickSession.forConfig("db-config", ConfigFactory.parseString(rocketDbConfig))
import session.profile.api._
def persistence(message: String) = {
def insert(message: String): DBIO[Int] = {
sqlu"""INSERT INTO test(id) VALUES (${message})"""
}
session.db.run(insert(message)).map {
case _ => message
}.recover {
case e : SQLException => {
throw new Exception("Database error ===>")}
case e : Exception => {
throw new Exception("Database error.")}
}
}
val route = path("hello" / Segment ) { name =>
get {
val res = persistence(name)
onComplete(res) {
case Success(value) => {
complete(s"<h1>Say hello to ${name}</h1>")
}
case Failure(e) => {
complete(s"<h1>Failed to say hello to ${name}</h1>")
}
}
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8088)
println(s"Server online at http://localhost:8088/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
我很清楚使用 Actors 当然是可能的:例如 https://github.com/chbatey/akka-http-typed.git 正在使用 AkkaHttp 和类型化的 actor。
但我不清楚是否只使用 AkkaStreams 及其 Alpakka 连接器库(包括数据库),是否可以进行常规的 CRUD / OLTP 服务,或者只是将数据从一个数据库复制到另一个数据库,或其他OLAP/批处理/流处理场景
如果您知道如何做到这一点,请指出一些细节,如果您能在 github 上提供示例,那就太好了。
我认为可能的方式是服务器涉及两个对话/有状态流转换:一个通过 HTTP 与外部世界,一个与数据库。我不确定是否可以这样建模。
https://doc.akka.io/docs/alpakka/current/slick.html 似乎既提供 UPDATE/INSERTS 作为接收器,又将 SELECT 指向某个 id 作为源。您是否知道是否有示例应用程序,或者您能否广泛提及如何使用 Akka Http 进行连接?
是的,基本上在 AkkaHttp 中收到的每个请求,我们都会创建一个 AkkaStreams Graph(通常只是一个管道),基本上只是来自数据库的 Slick Alpakka Source,可能有一些运算符作为前缀,然后在 AkkaHttp 中返回,这当然支持Source。更多详细信息,请访问 [https://www.quora.com/Is-it-possible-to-build-an-OLTP-CRUD-HTTP-server-using-Akka-HTTP-Akka-Streams-Alpakka-and-a-database-Do-you-know-any-examples-of-code-on-GitHub-or-elsewhere/answer/Nicolae-Marasoiu]
我放了一个demo,希望对你有帮助。
正在创建 table,数据库是 mysql。
CREATE TABLE test(id VARCHAR(32))
sbt:
"com.lightbend.akka" %% "akka-stream-alpakka-slick" % "1.1.0",
"mysql" % "mysql-connector-java" % "5.1.40"
代码:
package tech.parasol.scala.crud
import java.sql.SQLException
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{complete, get, path, _}
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.io.StdIn
import scala.util.{Failure, Success}
object CrudTest1 {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("CrudTest1")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
val hostName = "120.0.0.1"
val rocketDbConfig =
s"""
|db-config {
| profile = "slick.jdbc.MySQLProfile$$"
| db {
| dataSourceClass = "slick.jdbc.DriverDataSource"
| properties = {
| driver = "com.mysql.jdbc.Driver"
| url = "jdbc:mysql://${hostName}:3306/rocket?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false"
| user = "root"
| password = "passw0rd"
| }
| }
|}
|
""".stripMargin
implicit val session = SlickSession.forConfig("db-config", ConfigFactory.parseString(rocketDbConfig))
import session.profile.api._
def persistence(message: String) = {
def insert(message: String): DBIO[Int] = {
sqlu"""INSERT INTO test(id) VALUES (${message})"""
}
session.db.run(insert(message)).map {
case _ => message
}.recover {
case e : SQLException => {
throw new Exception("Database error ===>")}
case e : Exception => {
throw new Exception("Database error.")}
}
}
val route = path("hello" / Segment ) { name =>
get {
val res = persistence(name)
onComplete(res) {
case Success(value) => {
complete(s"<h1>Say hello to ${name}</h1>")
}
case Failure(e) => {
complete(s"<h1>Failed to say hello to ${name}</h1>")
}
}
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8088)
println(s"Server online at http://localhost:8088/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}