在 Scala 中导入 avro 模式

importing avro schema in Scala

我正在编写一个简单的 twitter 程序,我正在使用 Kafka 阅读推文并希望使用 Avro 进行序列化。到目前为止,我刚刚在 Scala 中设置了 twitter 配置,现在想使用此配置阅读推文。

如何在我的程序中导入文件 tweets.avsc 中定义的以下 avro 模式?

{
    "namespace": "tweetavro",
    "type": "record",
    "name": "Tweet",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "text", "type": "string"}
    ]
}

我按照网络上的一些示例显示了类似 import tweetavro.Tweet 的内容,以便在 Scala 中导入模式,以便我们可以像

一样使用它
def main (args: Array[String]) {
    val twitterStream = TwitterStream.getStream
    twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
    twitterStream.filter(filterUsOnly)
  }

  private def toTweet(s: Status): Tweet = {
    new Tweet(s.getUser.getName, s.getText)
  }

  private def sendToKafka(t:Tweet) {
    println(toJson(t.getSchema).apply(t))
    val tweetEnc = toBinary[Tweet].apply(t)
    val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
    kafkaProducer.send(msg)
  }

我正在遵循相同的方法并在 pom.xml

中使用以下插件
<!-- AVRO MAVEN PLUGIN -->
<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.7.7</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/scala/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>


<!-- MAVEN COMPILER PLUGIN -->
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <configuration>
    <source>1.7</source>
    <target>1.7</target>
  </configuration>
</plugin>

做了这么多,还是做不到import tweetavro.Tweet

有人可以帮忙吗?

谢谢!

您应该首先将该模式编译成 class。我不确定 Scala 中是否有用于生产的 Avro 库,但您可以为 Java 生成一个 class 并在 Scala 中使用它:

java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc .

根据您的需要更改此行,您应该会得到此工具生成的 tweetavro.Tweet class。然后你就可以把它放到你的项目中,按照你刚刚描述的方式使用。

更多信息here

更新:仅供参考,似乎有一个 library in Scala 但我以前从未使用过它

我推荐使用 Avrohugger。就 Avro 的 Scala 案例 类 而言,它是新手,但它支持我需要的一切,我真的很喜欢它不是基于宏的,所以我可以实际看到生成的内容。

与维护者一起工作非常棒,并且非常接受贡献和反馈。它没有而且可能永远不会像官方 Java 代码生成那样功能丰富,但它可以满足大多数人的需求。

目前,它缺少对联合(可选类型除外)和递归类型的支持。

SBT 插件运行良好,如果您想快速查看它对 Avro 模式的作用,可以使用新的 Web 界面:

https://avro2caseclass.herokuapp.com/

此处有更多详细信息:

https://github.com/julianpeeters/avrohugger

您也可以使用 avro4s。根据模式定义您的案例 class(或生成它)。我们称之为 class Tweet。然后你创建一个 AvroOutputStream,它也会从案例 class 中推断模式,并用于序列化实例。然后我们可以写入字节数组,并将其发送到 kafka。例如:

val tweet: Tweet= ... // the instance you want to serialize

val out = new ByteArrayOutputStream // we collect the serialized output in this
val avro = AvroOutputStream[Tweet](out) // you specify the type here as well
avro.write(tweet)
avro.close()

val bytes = out.toByteArray
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes)
kafkaProducer.send(msg)