如何将 akka actors 与 decline 整合

how to integrate akka actors with decline

我有一个带有 akka、akka http 和 akka 流的应用程序,所以它不是休息或微服务是一个短暂的应用程序;它是一个来自 rest api 的数据提取器,应用程序写入 postgres 数据库,它工作正常,我怀疑以这种方式集成 decline 和 akka 是否正确。

代码:

object App extends CommandIOApp(name = "app" , header = "extractor", version = "0.1.0") {

 
  val config = Config( token, jdbcDriver = jdbcDriver, jdbcURL = jdbcURL, jdbcUser = jdbcUser, jdbcPassword = jdbcPassword, jdbcSchema = jdbcSchema)

  override def main: Opts[IO[ExitCode]] =
    Opts.subcommand(name="extract-teams", help="Extract Teams from API"){

      val tableName = Opts.option[String]("recursive", short="r", help="Recursive Extraction")
      val outputFile = Opts.option[Path]("output", short="o", help="Output file").withDefault(Paths.get("output.csv"))

      ( tableName, outputFile).mapN{( table, output) =>
        println(table)
        println(output)

        ClickUpExtractions.extractTeams()

        IO(ExitCode.Success)
      }
    }

  object ClickUpExtractions extends ActorGlobalImplicits {

    def extractTeams(): Unit ={

      import com.arkondata.extractors.ClickUpTeamsActions._
      val extractorFetcher = system.actorOf(ClickUpTeamsFetcher.props(config), name = "EngineActor")
      val extractorWriter = system.actorOf(ClickUpTeamsBulk.props(config) , name = "Writer")

      extractorFetcher ! Fetch(extractorWriter)

    }

  }

}

接收码:

override def receive: Receive = {
    case WritePG(teamsData) =>
      println("-------writer")
      println(teamsData)
      val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id, data.name, data.color, data.avatar))
      println(teamsPG)

      println("insert-----------------------")
      //// manejar exception aqui  importante!!!
      val upsertStatement = "insert into Teams (id, name, color, avatar) values ( ? , ? , ? , ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
      BulkWriter(config).insertMany[Teams](teamsPG,upsertStatement).unsafeRunSync()//.unsafeToFuture()//.unsafeRunSync()
      //// manejar exception aqui  importante!!!
      println("insert doobie")
      println("end------------------------")

      self ! "mensaje salida myself!!!!"
    case msg:String =>
      println(msg)
      context.system.terminate()
      System.exit(1) // this is the last step in the akka flow
  }

注:

System.exit(1) // this is the last step in the akka flow

我还使用其他库,如 doobie、decline、cats:https://ben.kirw.in/decline/effect.html。 这些应用程序工作只是我需要知道它是否正确或者 akka 和这个之间是否存在更好的集成:IO(ExitCode.Success)

有什么方法可以从 akka 得到一个 tell 响应并验证它,比如 :

val res = extractorFetcher ! Fetch(extractorWriter)
if (validate(res)) 
   IO(ExitCode.Success)
else 

   IO(ExitCode.Error)

一些建议:

  1. 您可以将 Akka 与其他基于猫的框架一起使用,但是您的应用程序中会有两种不同的行话:Fibers、ActorSystems、Futures 与 IO effects,...所以,如果您想工作与 Cats 一起使用 Monix 或 FS2,它们直接与 Cats 效果器连接。相反,如果 Akka 对你来说是强制性的,请使用 Akka Streams,它有一个很棒的套件,只处理 Akka 调度程序和 Scala Futures。

  2. 使用 Akka Typed。自从 Akka Typed 投入生产以来,Akka 有了很大的改进。您正在接收函数中创建异步计算,这很容易出错。

Akka 生态系统依赖于 Future。 Cats Effect 依赖于一些 F[_] - 内置的是 IO 但也有 Monix 的 Task 和 ZIO.

Translation Future <=> IO 可以像这样完成:

io.unsafeToFuture

IO.fromFuture(IO(start future here)) // Future start is side-effect on its own

同样适用于 Monix:

task.runToFuture // requires Scheduler

Task.deferFuture(future thunk) // or
Task.deferFutureAction(implicit scheduler => future)

如果您使用的是无标签,那么它是:

Async.fromFuture(Sync[F].delay(start future)) // requires implicit Async[F]

val runToFuture: F ~> Future // AFAIR there is no popular type class for it
runToFuture(io)

如果您正在使用流,那么有 streamz 处理 Akka Streams <=> FS2 翻译的库。 Akka Streams、Monix 的 Observable 实现和 FS2 提供了 Reactive Streams 的实现,因此您可以使这些库使用 RS 接口相互通信,因此像 streamz 这样的库只是一个方便的工具。

没有其他集成 AFAIK,也不需要它们。

但是,我建议了解一些基本知识,了解 Akka 和 Future 的工作原理(渴望、记忆、执行上下文等)与猫效应的工作原理(通常是懒惰、没有记忆、除了 Clock 的初始化之外没有 EC或 CE IO 中的 ContextShift,CE IO 和 Monix 之间的细微差别,例如调度程序)。如果你不知道它们之间的区别,你很容易造成一些伤害。