在 Scala 中导入 avro 模式
importing avro schema in Scala
我正在编写一个简单的 twitter 程序,我正在使用 Kafka 阅读推文并希望使用 Avro 进行序列化。到目前为止,我刚刚在 Scala 中设置了 twitter 配置,现在想使用此配置阅读推文。
如何在我的程序中导入文件 tweets.avsc 中定义的以下 avro 模式?
{
"namespace": "tweetavro",
"type": "record",
"name": "Tweet",
"fields": [
{"name": "name", "type": "string"},
{"name": "text", "type": "string"}
]
}
我按照网络上的一些示例显示了类似 import tweetavro.Tweet
的内容,以便在 Scala 中导入模式,以便我们可以像
一样使用它
def main (args: Array[String]) {
val twitterStream = TwitterStream.getStream
twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
twitterStream.filter(filterUsOnly)
}
private def toTweet(s: Status): Tweet = {
new Tweet(s.getUser.getName, s.getText)
}
private def sendToKafka(t:Tweet) {
println(toJson(t.getSchema).apply(t))
val tweetEnc = toBinary[Tweet].apply(t)
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
kafkaProducer.send(msg)
}
我正在遵循相同的方法并在 pom.xml
中使用以下插件
<!-- AVRO MAVEN PLUGIN -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.7</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/scala/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- MAVEN COMPILER PLUGIN -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
做了这么多,还是做不到import tweetavro.Tweet
有人可以帮忙吗?
谢谢!
您应该首先将该模式编译成 class。我不确定 Scala 中是否有用于生产的 Avro 库,但您可以为 Java 生成一个 class 并在 Scala 中使用它:
java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc .
根据您的需要更改此行,您应该会得到此工具生成的 tweetavro.Tweet class。然后你就可以把它放到你的项目中,按照你刚刚描述的方式使用。
更多信息here
更新:仅供参考,似乎有一个 library in Scala 但我以前从未使用过它
我推荐使用 Avrohugger。就 Avro 的 Scala 案例 类 而言,它是新手,但它支持我需要的一切,我真的很喜欢它不是基于宏的,所以我可以实际看到生成的内容。
与维护者一起工作非常棒,并且非常接受贡献和反馈。它没有而且可能永远不会像官方 Java 代码生成那样功能丰富,但它可以满足大多数人的需求。
目前,它缺少对联合(可选类型除外)和递归类型的支持。
SBT 插件运行良好,如果您想快速查看它对 Avro 模式的作用,可以使用新的 Web 界面:
https://avro2caseclass.herokuapp.com/
此处有更多详细信息:
您也可以使用 avro4s。根据模式定义您的案例 class(或生成它)。我们称之为 class Tweet
。然后你创建一个 AvroOutputStream
,它也会从案例 class 中推断模式,并用于序列化实例。然后我们可以写入字节数组,并将其发送到 kafka。例如:
val tweet: Tweet= ... // the instance you want to serialize
val out = new ByteArrayOutputStream // we collect the serialized output in this
val avro = AvroOutputStream[Tweet](out) // you specify the type here as well
avro.write(tweet)
avro.close()
val bytes = out.toByteArray
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes)
kafkaProducer.send(msg)
我正在编写一个简单的 twitter 程序,我正在使用 Kafka 阅读推文并希望使用 Avro 进行序列化。到目前为止,我刚刚在 Scala 中设置了 twitter 配置,现在想使用此配置阅读推文。
如何在我的程序中导入文件 tweets.avsc 中定义的以下 avro 模式?
{
"namespace": "tweetavro",
"type": "record",
"name": "Tweet",
"fields": [
{"name": "name", "type": "string"},
{"name": "text", "type": "string"}
]
}
我按照网络上的一些示例显示了类似 import tweetavro.Tweet
的内容,以便在 Scala 中导入模式,以便我们可以像
def main (args: Array[String]) {
val twitterStream = TwitterStream.getStream
twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
twitterStream.filter(filterUsOnly)
}
private def toTweet(s: Status): Tweet = {
new Tweet(s.getUser.getName, s.getText)
}
private def sendToKafka(t:Tweet) {
println(toJson(t.getSchema).apply(t))
val tweetEnc = toBinary[Tweet].apply(t)
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
kafkaProducer.send(msg)
}
我正在遵循相同的方法并在 pom.xml
<!-- AVRO MAVEN PLUGIN -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.7</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/scala/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- MAVEN COMPILER PLUGIN -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
做了这么多,还是做不到import tweetavro.Tweet
有人可以帮忙吗?
谢谢!
您应该首先将该模式编译成 class。我不确定 Scala 中是否有用于生产的 Avro 库,但您可以为 Java 生成一个 class 并在 Scala 中使用它:
java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc .
根据您的需要更改此行,您应该会得到此工具生成的 tweetavro.Tweet class。然后你就可以把它放到你的项目中,按照你刚刚描述的方式使用。
更多信息here
更新:仅供参考,似乎有一个 library in Scala 但我以前从未使用过它
我推荐使用 Avrohugger。就 Avro 的 Scala 案例 类 而言,它是新手,但它支持我需要的一切,我真的很喜欢它不是基于宏的,所以我可以实际看到生成的内容。
与维护者一起工作非常棒,并且非常接受贡献和反馈。它没有而且可能永远不会像官方 Java 代码生成那样功能丰富,但它可以满足大多数人的需求。
目前,它缺少对联合(可选类型除外)和递归类型的支持。
SBT 插件运行良好,如果您想快速查看它对 Avro 模式的作用,可以使用新的 Web 界面:
https://avro2caseclass.herokuapp.com/
此处有更多详细信息:
您也可以使用 avro4s。根据模式定义您的案例 class(或生成它)。我们称之为 class Tweet
。然后你创建一个 AvroOutputStream
,它也会从案例 class 中推断模式,并用于序列化实例。然后我们可以写入字节数组,并将其发送到 kafka。例如:
val tweet: Tweet= ... // the instance you want to serialize
val out = new ByteArrayOutputStream // we collect the serialized output in this
val avro = AvroOutputStream[Tweet](out) // you specify the type here as well
avro.write(tweet)
avro.close()
val bytes = out.toByteArray
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes)
kafkaProducer.send(msg)