为什么一个 Kafka 流阻止另一个开始?
Why one Kafka streams block the other one from getting started?
我正在使用新的 Kafka-scala-streams api 最近由 lightbend 开源。
我正在尝试 运行 两个流。但是发生的事情是其中两个不同时运行,我没有得到想要的输出。
package in.internity
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write
import scala.util.Try
/**
* @author Shivansh <shiv4nsh@gmail.com>
* @since 8/1/18
*/
object Boot extends App {
implicit val formats: DefaultFormats.type = DefaultFormats
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val streams1 = wordSplit("lines", "wordCount")
val streams2 = readAndWriteJson("person", "personName")
private def wordSplit(intopic: String, outTopic: String) = {
val builder = new StreamsBuilderS()
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\W+").toIterable)
data.to(outTopic, produced)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams
}
private def readAndWriteJson(intopic: String, outTopic: String) = {
val builder = new StreamsBuilderS()
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.mapValues(value => {
val person = Try(parse(value).extract[Person]).toOption
println("1::", person)
val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
println("2::", personNameAndEmail)
write(personNameAndEmail)
})
data.to(outTopic, produced)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams
}
streams1.start()
streams2.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams2.close(10, TimeUnit.SECONDS)
streams1.close(10, TimeUnit.SECONDS)
}))
}
case class Person(name: String, age: Int, email: String)
case class PersonNameAndEmail(name: String, email: String)
当我 运行 这并生成关于主题 person
的消息时,它们不会被消耗。
但是当我改变他们开始的顺序时,即
streams2.start()
streams1.start()
它工作正常。那么为什么一个流的开始会阻塞另一个流。我们不能同时 运行 多个流吗?
成功了,好像我自己尝试用不同的方法初始化流两次(我真傻 :P)
工作代码:
package in.internity
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write
import scala.util.Try
/**
* @author Shivansh <shiv4nsh@gmail.com>
* @since 8/1/18
*/
object Boot extends App {
implicit val formats: DefaultFormats.type = DefaultFormats
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val builder = new StreamsBuilderS()
private def wordSplit(intopic: String, outTopic: String) = {
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\W+").toIterable)
data.to(outTopic, produced)
}
private def readAndWriteJson(intopic: String, outTopic: String) = {
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.mapValues(value => {
val person = Try(parse(value).extract[Person]).toOption
println("1::", person)
val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
println("2::", personNameAndEmail)
write(personNameAndEmail)
})
data.to(outTopic, produced)
}
wordSplit("lines", "wordCount")
readAndWriteJson("person", "personName")
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
streams
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}
case class Person(name: String, age: Int, email: String)
case class PersonNameAndEmail(name: String, email: String)
我正在使用新的 Kafka-scala-streams api 最近由 lightbend 开源。 我正在尝试 运行 两个流。但是发生的事情是其中两个不同时运行,我没有得到想要的输出。
package in.internity
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write
import scala.util.Try
/**
* @author Shivansh <shiv4nsh@gmail.com>
* @since 8/1/18
*/
object Boot extends App {
implicit val formats: DefaultFormats.type = DefaultFormats
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val streams1 = wordSplit("lines", "wordCount")
val streams2 = readAndWriteJson("person", "personName")
private def wordSplit(intopic: String, outTopic: String) = {
val builder = new StreamsBuilderS()
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\W+").toIterable)
data.to(outTopic, produced)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams
}
private def readAndWriteJson(intopic: String, outTopic: String) = {
val builder = new StreamsBuilderS()
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.mapValues(value => {
val person = Try(parse(value).extract[Person]).toOption
println("1::", person)
val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
println("2::", personNameAndEmail)
write(personNameAndEmail)
})
data.to(outTopic, produced)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams
}
streams1.start()
streams2.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams2.close(10, TimeUnit.SECONDS)
streams1.close(10, TimeUnit.SECONDS)
}))
}
case class Person(name: String, age: Int, email: String)
case class PersonNameAndEmail(name: String, email: String)
当我 运行 这并生成关于主题 person
的消息时,它们不会被消耗。
但是当我改变他们开始的顺序时,即
streams2.start()
streams1.start()
它工作正常。那么为什么一个流的开始会阻塞另一个流。我们不能同时 运行 多个流吗?
成功了,好像我自己尝试用不同的方法初始化流两次(我真傻 :P)
工作代码:
package in.internity
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write
import scala.util.Try
/**
* @author Shivansh <shiv4nsh@gmail.com>
* @since 8/1/18
*/
object Boot extends App {
implicit val formats: DefaultFormats.type = DefaultFormats
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val builder = new StreamsBuilderS()
private def wordSplit(intopic: String, outTopic: String) = {
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\W+").toIterable)
data.to(outTopic, produced)
}
private def readAndWriteJson(intopic: String, outTopic: String) = {
val produced = Produced.`with`(Serdes.String(), Serdes.String())
val textLines: KStreamS[String, String] = builder.stream(intopic)
val data: KStreamS[String, String] = textLines.mapValues(value => {
val person = Try(parse(value).extract[Person]).toOption
println("1::", person)
val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
println("2::", personNameAndEmail)
write(personNameAndEmail)
})
data.to(outTopic, produced)
}
wordSplit("lines", "wordCount")
readAndWriteJson("person", "personName")
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.start()
streams
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}
case class Person(name: String, age: Int, email: String)
case class PersonNameAndEmail(name: String, email: String)