Lagom 框架:Kafka 主题未创建
Lagom Framework: Kafka topic not getting created
我正在尝试使用 lagom 框架编写一个小型微服务,并实现读取端以支持 mysql。
https://github.com/codingkapoor/lagom-scala-slick
这个服务的objective是暴露api来创建、更新和读取员工。
然而,在执行时,该项目并未创建 kafka 主题并向其发布消息。我试过调试、阅读文档并参考了其他几个类似的项目,但到目前为止都没有成功。
Lagom 文档和类似项目是唯一可以为这种相当新的技术找到任何帮助的资源。我真的需要帮助来调试和理解这个问题。让我知道这是否是寻求此类帮助的正确平台。
我创建员工并可能看到创建的 kafka 主题的步骤如下:
#1. sbt runAll
#2. curl -X POST \
http://localhost:9000/api/employees \
-H 'Content-Type: application/json' \
-d '{
"id": "128",
"name": "Shivam",
"gender": "M",
"doj": "2017-01-16",
"pfn": "PFKN110"
}'
#3. /opt/kafka_2.12-2.3.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
#4. /opt/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic employee --from-beginning
员工服务我添加了一个方法getEmployees:
trait EmployeeService extends Service {
def addEmployee(): ServiceCall[Employee, Done]
def getEmployees(): ServiceCall[NotUsed, Vector[Employee]]
def employeeTopic: Topic[EmployeeAddedEvent]
override final def descriptor: Descriptor = {
import Service._
named("employee")
.withCalls(
restCall(Method.POST, "/api/employees", addEmployee _),
restCall(Method.GET, "/api/employees", getEmployees _)
)
.withTopics(
topic(EmployeeService.TOPIC_NAME, employeeTopic _)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[EmployeeAddedEvent](_.id)
))
.withAutoAcl(true)
}
}
在应用程序配置中添加了一行,因此 cassandra 设置如下所示:
cassandra-journal.keyspace = ${employees.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${employees.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${employees.cassandra.keyspace}
EmployeeApplication 如下所示:
abstract class EmployeeApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with LagomKafkaComponents
with CassandraPersistenceComponents
with HikariCPComponents
with AhcWSComponents {
EmployeeServiceImpl 添加了以下方法:
override def getEmployees(): ServiceCall[NotUsed, Vector[Employee]] = ServiceCall { _ =>
employeeRepository.getEmployees()
}
EmployeeRepository 我这样重写:
package com.codingkapoor.employee.persistence.read
import java.time.LocalDate
import akka.Done
import com.codingkapoor.employee.api.Employee
import com.lightbend.lagom.scaladsl.persistence.cassandra.CassandraSession
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class EmployeeRepository(session: CassandraSession) {
def createTable: Future[Done] = {
for {
r <- session.executeCreateTable("CREATE TABLE IF NOT EXISTS employees(id text, name text, gender text, PRIMARY KEY (id))")
} yield r
}
def getEmployees(): Future[Vector[Employee]] = {
session.selectAll("SELECT * FROM employees").map(rows =>
rows.map(r => Employee(
id = r.getString("id"),
name = r.getString("name"),
gender = r.getString("gender"),
doj = LocalDate.now(),
pfn = "pfn")).toVector)
}
}
EventProcessor 看起来像这样:
package com.codingkapoor.employee.persistence.read
import akka.Done
import com.codingkapoor.employee.persistence.write.{EmployeeAdded, EmployeeEvent}
import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
class EmployeeEventProcessor(readSide: CassandraReadSide, employeeRepository: EmployeeRepository, session: CassandraSession)
extends ReadSideProcessor[EmployeeEvent] {
override def buildHandler(): ReadSideProcessor.ReadSideHandler[EmployeeEvent] =
readSide
.builder[EmployeeEvent]("employeeoffset")
.setGlobalPrepare(() => employeeRepository.createTable)
.setPrepare(_ => prepare())
.setEventHandler[EmployeeAdded](processEmployeeAdded)
.build()
private val createPromise = Promise[PreparedStatement]
private def createFuture: Future[PreparedStatement] = createPromise.future
override def aggregateTags: Set[AggregateEventTag[EmployeeEvent]] = Set(EmployeeEvent.Tag)
private def prepare(query: String, promise: Promise[PreparedStatement]): Future[Done] = {
val f = session.prepare(query)
promise.completeWith(f)
f.map(_ => Done)
}
def prepare(): Future[Done] = {
for {
r <- prepare("INSERT INTO employees (id, name, gender) VALUES (?, ?, ?)", createPromise)
} yield r
}
private def processEmployeeAdded(eventElement: EventStreamElement[EmployeeAdded]): Future[List[BoundStatement]] = {
createFuture.map { ps =>
val bindCreate = ps.bind()
bindCreate.setString("id", eventElement.event.id)
bindCreate.setString("name", eventElement.event.name)
bindCreate.setString("gender", eventElement.event.gender)
List(bindCreate)
}
}
}
我添加了方法 getEmployees 来检查读取端是否正常工作。另外,发送create employee后需要等待10-20秒才会出现在数据库中,之后就可以从readside获取了。
经过一番努力,我解决了这个问题。所以基本上有两个问题:
第一个问题是因为特征 ReadSideJdbcPersistenceComponents
和 WriteSideCassandraPersistenceComponents
被扩展以创建 EmployeeApplication
的顺序。由于 Lagom 中的错误,您混合这两个特征的顺序是相关的,只有当您在 WriteSideCassandraPersistenceComponents 之前混合 ReadSideJdbcPersistenceComponents 时,您才能使用此组合。
请参阅来自 lagom-samples 的 README。
此外,我没有按照 lagom 文档 here.
中的说明正确实现多态事件流
我现在想出了一个可行的 github 项目,您可以参考:
lagom-scala-slick.
我正在尝试使用 lagom 框架编写一个小型微服务,并实现读取端以支持 mysql。 https://github.com/codingkapoor/lagom-scala-slick
这个服务的objective是暴露api来创建、更新和读取员工。
然而,在执行时,该项目并未创建 kafka 主题并向其发布消息。我试过调试、阅读文档并参考了其他几个类似的项目,但到目前为止都没有成功。
Lagom 文档和类似项目是唯一可以为这种相当新的技术找到任何帮助的资源。我真的需要帮助来调试和理解这个问题。让我知道这是否是寻求此类帮助的正确平台。
我创建员工并可能看到创建的 kafka 主题的步骤如下:
#1. sbt runAll
#2. curl -X POST \
http://localhost:9000/api/employees \
-H 'Content-Type: application/json' \
-d '{
"id": "128",
"name": "Shivam",
"gender": "M",
"doj": "2017-01-16",
"pfn": "PFKN110"
}'
#3. /opt/kafka_2.12-2.3.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
#4. /opt/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic employee --from-beginning
员工服务我添加了一个方法getEmployees:
trait EmployeeService extends Service {
def addEmployee(): ServiceCall[Employee, Done]
def getEmployees(): ServiceCall[NotUsed, Vector[Employee]]
def employeeTopic: Topic[EmployeeAddedEvent]
override final def descriptor: Descriptor = {
import Service._
named("employee")
.withCalls(
restCall(Method.POST, "/api/employees", addEmployee _),
restCall(Method.GET, "/api/employees", getEmployees _)
)
.withTopics(
topic(EmployeeService.TOPIC_NAME, employeeTopic _)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[EmployeeAddedEvent](_.id)
))
.withAutoAcl(true)
}
}
在应用程序配置中添加了一行,因此 cassandra 设置如下所示:
cassandra-journal.keyspace = ${employees.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${employees.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${employees.cassandra.keyspace}
EmployeeApplication 如下所示:
abstract class EmployeeApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with LagomKafkaComponents
with CassandraPersistenceComponents
with HikariCPComponents
with AhcWSComponents {
EmployeeServiceImpl 添加了以下方法:
override def getEmployees(): ServiceCall[NotUsed, Vector[Employee]] = ServiceCall { _ =>
employeeRepository.getEmployees()
}
EmployeeRepository 我这样重写:
package com.codingkapoor.employee.persistence.read
import java.time.LocalDate
import akka.Done
import com.codingkapoor.employee.api.Employee
import com.lightbend.lagom.scaladsl.persistence.cassandra.CassandraSession
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class EmployeeRepository(session: CassandraSession) {
def createTable: Future[Done] = {
for {
r <- session.executeCreateTable("CREATE TABLE IF NOT EXISTS employees(id text, name text, gender text, PRIMARY KEY (id))")
} yield r
}
def getEmployees(): Future[Vector[Employee]] = {
session.selectAll("SELECT * FROM employees").map(rows =>
rows.map(r => Employee(
id = r.getString("id"),
name = r.getString("name"),
gender = r.getString("gender"),
doj = LocalDate.now(),
pfn = "pfn")).toVector)
}
}
EventProcessor 看起来像这样:
package com.codingkapoor.employee.persistence.read
import akka.Done
import com.codingkapoor.employee.persistence.write.{EmployeeAdded, EmployeeEvent}
import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}
class EmployeeEventProcessor(readSide: CassandraReadSide, employeeRepository: EmployeeRepository, session: CassandraSession)
extends ReadSideProcessor[EmployeeEvent] {
override def buildHandler(): ReadSideProcessor.ReadSideHandler[EmployeeEvent] =
readSide
.builder[EmployeeEvent]("employeeoffset")
.setGlobalPrepare(() => employeeRepository.createTable)
.setPrepare(_ => prepare())
.setEventHandler[EmployeeAdded](processEmployeeAdded)
.build()
private val createPromise = Promise[PreparedStatement]
private def createFuture: Future[PreparedStatement] = createPromise.future
override def aggregateTags: Set[AggregateEventTag[EmployeeEvent]] = Set(EmployeeEvent.Tag)
private def prepare(query: String, promise: Promise[PreparedStatement]): Future[Done] = {
val f = session.prepare(query)
promise.completeWith(f)
f.map(_ => Done)
}
def prepare(): Future[Done] = {
for {
r <- prepare("INSERT INTO employees (id, name, gender) VALUES (?, ?, ?)", createPromise)
} yield r
}
private def processEmployeeAdded(eventElement: EventStreamElement[EmployeeAdded]): Future[List[BoundStatement]] = {
createFuture.map { ps =>
val bindCreate = ps.bind()
bindCreate.setString("id", eventElement.event.id)
bindCreate.setString("name", eventElement.event.name)
bindCreate.setString("gender", eventElement.event.gender)
List(bindCreate)
}
}
}
我添加了方法 getEmployees 来检查读取端是否正常工作。另外,发送create employee后需要等待10-20秒才会出现在数据库中,之后就可以从readside获取了。
经过一番努力,我解决了这个问题。所以基本上有两个问题:
第一个问题是因为特征
ReadSideJdbcPersistenceComponents
和WriteSideCassandraPersistenceComponents
被扩展以创建EmployeeApplication
的顺序。由于 Lagom 中的错误,您混合这两个特征的顺序是相关的,只有当您在 WriteSideCassandraPersistenceComponents 之前混合 ReadSideJdbcPersistenceComponents 时,您才能使用此组合。请参阅来自 lagom-samples 的 README。
此外,我没有按照 lagom 文档 here.
中的说明正确实现多态事件流
我现在想出了一个可行的 github 项目,您可以参考: lagom-scala-slick.