是否可以使用 AkkaHttp、AkkaStreams、Alpakka 和数据库构建 OLTP/CRUD HTTP 服务器?

Is it possible to build an OLTP/CRUD HTTP server using AkkaHttp, AkkaStreams, Alpakka and a database?

我很清楚使用 Actors 当然是可能的:例如 https://github.com/chbatey/akka-http-typed.git 正在使用 AkkaHttp 和类型化的 actor。

但我不清楚是否只使用 AkkaStreams 及其 Alpakka 连接器库(包括数据库),是否可以进行常规的 CRUD / OLTP 服务,或者只是将数据从一个数据库复制到另一个数据库,或其他OLAP/批处理/流处理场景

如果您知道如何做到这一点,请指出一些细节,如果您能在 github 上提供示例,那就太好了。

我认为可能的方式是服务器涉及两个对话/有状态流转换:一个通过 HTTP 与外部世界,一个与数据库。我不确定是否可以这样建模。

https://doc.akka.io/docs/alpakka/current/slick.html 似乎既提供 UPDATE/INSERTS 作为接收器,又将 SELECT 指向某个 id 作为源。您是否知道是否有示例应用程序,或者您能否广泛提及如何使用 Akka Http 进行连接?

是的,基本上在 AkkaHttp 中收到的每个请求,我们都会创建一个 AkkaStreams Graph(通常只是一个管道),基本上只是来自数据库的 Slick Alpakka Source,可能有一些运算符作为前缀,然后在 AkkaHttp 中返回,这当然支持Source。更多详细信息,请访问 [https://www.quora.com/Is-it-possible-to-build-an-OLTP-CRUD-HTTP-server-using-Akka-HTTP-Akka-Streams-Alpakka-and-a-database-Do-you-know-any-examples-of-code-on-GitHub-or-elsewhere/answer/Nicolae-Marasoiu]

我放了一个demo,希望对你有帮助。

正在创建 table,数据库是 mysql。

CREATE TABLE test(id VARCHAR(32))

sbt:

"com.lightbend.akka"                        %% "akka-stream-alpakka-slick"     % "1.1.0",
"mysql"                                      % "mysql-connector-java"          % "5.1.40"

代码:

package tech.parasol.scala.crud

import java.sql.SQLException

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{complete, get, path, _}
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.io.StdIn
import scala.util.{Failure, Success}

object CrudTest1 {


  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem("CrudTest1")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher




    val hostName = "120.0.0.1"
    val rocketDbConfig =
      s"""
         |db-config {
         |  profile = "slick.jdbc.MySQLProfile$$"
         |  db {
         |    dataSourceClass = "slick.jdbc.DriverDataSource"
         |    properties = {
         |      driver = "com.mysql.jdbc.Driver"
         |      url = "jdbc:mysql://${hostName}:3306/rocket?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false"
         |      user = "root"
         |      password = "passw0rd"
         |    }
         |  }
         |}
         |
     """.stripMargin


    implicit val session = SlickSession.forConfig("db-config", ConfigFactory.parseString(rocketDbConfig))


    import session.profile.api._

    def persistence(message: String) = {
      def insert(message: String): DBIO[Int] = {
        sqlu"""INSERT INTO test(id) VALUES (${message})"""
      }

      session.db.run(insert(message)).map {
        case _ => message
      }.recover {
        case e : SQLException => {
          throw new Exception("Database error ===>")}
        case e : Exception => {
          throw new Exception("Database error.")}
      }

    }


    val route = path("hello" / Segment ) { name =>
        get {
          val res = persistence(name)
          onComplete(res) {
            case Success(value) => {
              complete(s"<h1>Say hello to ${name}</h1>")
            }
            case Failure(e) => {
              complete(s"<h1>Failed to say hello to ${name}</h1>")
            }
          }
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8088)

    println(s"Server online at http://localhost:8088/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}