如何在 Kafka 生产者中从外部 API 读取数据并将其发送给 Scala 中的 Kafka 消费者

How to Read data from external API in Kafka producer and send it to Kafka consumer in Scala

我是 Apache Kafka 的新手,我想从 https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo API 读取生产者内部的数据,然后将其发送到主题并读取这些数据从我的消费者内部的主题将其保存到数据库。

我不知道如何发送此数据,因为它是 JSON 格式。

我尝试了一个具有字符串值的 Kafka 消费者-生产者示例:

在我的示例中,我的 Producer.scala 是:

import java.util.Properties

import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import play.api.libs.json.{JsObject, JsValue, Json}
object Producer extends App {

  val url = "https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&outputsize=full&apikey=demo"
  val httpClient = HttpClientBuilder.create().build()
  val httpResponse = httpClient.execute(new HttpGet(url))
  val entity = httpResponse.getEntity
    val str = EntityUtils.toString(entity, "UTF-8")
    val content = Json.parse(str)
  val props:Properties = new Properties()
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")
  props.put("acks","all")
  val producer = new KafkaProducer[Nothing, (String,JsValue)](props)
  val topic = "quick-start"
  try {
      val record = new ProducerRecord(topic, content.as[JsObject].fields(1))
      producer.send(record)
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    producer.close()
  }
}

我的Consumer.scala是:

import java.util.{Collections, Properties}
import java.util.regex.Pattern
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object Consumer extends App {

  val props:Properties = new Properties()
  props.put("group.id", "test")
  props.put("bootstrap.servers","localhost:9092")
  props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("enable.auto.commit", "true")
  props.put("auto.commit.interval.ms", "1000")
  val consumer = new KafkaConsumer(props)
  val topics = List("quick-start")
  try {
    consumer.subscribe(topics.asJava)
    while (true) {
      val records = consumer.poll(10)
      for (record <- records.asScala) {
        println("Topic: " + record.topic() +
          ",Key: " + record.key() +
          ",Value: " + record.value() +
          ", Offset: " + record.offset() +
          ", Partition: " + record.partition())
      }
    }
  }catch{
    case e:Exception => e.printStackTrace()
  }finally {
    consumer.close()
  }
}

我的built.sbt是:

name := "Kafka-AkkaPractice"

version := "0.1"

scalaVersion := "2.12.2"

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "2.1.0",
  "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime,
  "org.apache.httpcomponents" % "httpclient" % "4.5.2",
  "com.typesafe.play" %% "play-json" % "2.8.0"
)

我的理解是

props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer")

是String类型的Key和Value。

谁能建议如何将这种类型的数据发送到我的 Kafka 主题,以便我可以从 Consumer 读取它并将其保存到数据库?

除了验证 API 响应可以被解析外,您不需要在生产者中解析 JSON。

如果您想按原样发送数据,那么您需要 val record = new ProducerRecord(topic, str)

而在消费者中

 for (record <- records.asScala) {
    val content = Json.parse(record.value())