如何确保持续生成 Avro 模式并避免 'Too many schema objects created for x' 异常?

How to ensure constant Avro schema generation and avoid the 'Too many schema objects created for x' exception?

我在使用 reactive kafka and avro4s 生成 Avro 消息时遇到可重现的错误。达到客户端 (CachedSchemaRegistryClient) 的 identityMapCapacity 后,序列化失败

java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value

这是出乎意料的,因为所有消息都应具有相同的架构 - 它们是同一案例的序列化 class。

val avroProducerSettings: ProducerSettings[String, GenericRecord] = 
  ProducerSettings(system, Serdes.String().serializer(), 
  avroSerde.serializer())
 .withBootstrapServers(settings.bootstrapServer)

val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
                    ProducerMessage.Result[String, GenericRecord, String],
                    NotUsed] = Producer.flow(avroProducerSettings)

val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] = 
  Source.queue(bufferSize, overflowStrategy)
  .via(avroProdFlow)
  .map(logResult)
  .to(Sink.ignore)
  .run()

...
queue.offer(msg)

序列化器是 KafkaAvroSerializer,用 new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)

实例化

正在生成 GenericRecord:

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
  recordFormat.to(a)

val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
  val edgeAvro: GenericRecord = toAvro(edge)
  val record   = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
  ProducerMessage.Message(record, edge.id)
}

模式是在代码深处创建的(io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema,由 io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl 调用),我对其没有影响,所以我不知道如何修复漏洞。在我看来,这两个合流的项目不能很好地协同工作。

我发现的问题 here, here and here 似乎没有解决我的用例。

目前我的两个解决方法是:

有没有办法根据 message/record 类型生成或缓存一致的模式并将其用于我的设置?

编辑 2017.11.20

在我的案例中,问题是 GenericRecord 的每个实例都承载了我的消息,这些实例已被 RecordFormat 的不同实例序列化,其中包含 Schema 的不同实例。这里的隐式解析每次都会生成一个新实例。

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)

解决方案是将 RecordFormat 实例固定到 val 并显式重用它。非常感谢 https://github.com/heliocentrist for explaining the details

原回复:

等待一段时间后(github issue 也没有答案)我不得不实现自己的 SchemaRegistryClient。超过 90% 是从原文 CachedSchemaRegistryClient 复制而来的,只是翻译成 scala。使用 scala mutable.Map 修复了内存泄漏。我没有进行任何全面的测试,所以使用风险自负。

import java.util

import io.confluent.kafka.schemaregistry.client.rest.entities.{ Config, SchemaString }
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest
import io.confluent.kafka.schemaregistry.client.rest.{ RestService, entities }
import io.confluent.kafka.schemaregistry.client.{ SchemaMetadata, SchemaRegistryClient }
import org.apache.avro.Schema

import scala.collection.mutable

class CachingSchemaRegistryClient(val restService: RestService, val identityMapCapacity: Int)
    extends SchemaRegistryClient {

  val schemaCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
  val idCache: mutable.Map[String, mutable.Map[Integer, Schema]] =
    mutable.Map(null.asInstanceOf[String] -> mutable.Map())
  val versionCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()

  def this(baseUrl: String, identityMapCapacity: Int) {
    this(new RestService(baseUrl), identityMapCapacity)
  }

  def this(baseUrls: util.List[String], identityMapCapacity: Int) {
    this(new RestService(baseUrls), identityMapCapacity)
  }

  def registerAndGetId(subject: String, schema: Schema): Int =
    restService.registerSchema(schema.toString, subject)

  def getSchemaByIdFromRegistry(id: Int): Schema = {
    val restSchema: SchemaString = restService.getId(id)
    (new Schema.Parser).parse(restSchema.getSchemaString)
  }

  def getVersionFromRegistry(subject: String, schema: Schema): Int = {
    val response: entities.Schema = restService.lookUpSubjectVersion(schema.toString, subject)
    response.getVersion.intValue
  }

  override def getVersion(subject: String, schema: Schema): Int = synchronized {
    val schemaVersionMap: mutable.Map[Schema, Integer] =
      versionCache.getOrElseUpdate(subject, mutable.Map())

    val version: Integer = schemaVersionMap.getOrElse(
      schema, {
        if (schemaVersionMap.size >= identityMapCapacity) {
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        }

        val version = new Integer(getVersionFromRegistry(subject, schema))
        schemaVersionMap.put(schema, version)
        version
      }
    )
    version.intValue()
  }

  override def getAllSubjects: util.List[String] = restService.getAllSubjects()

  override def getByID(id: Int): Schema = synchronized { getBySubjectAndID(null, id) }

  override def getBySubjectAndID(subject: String, id: Int): Schema = synchronized {
    val idSchemaMap: mutable.Map[Integer, Schema] = idCache.getOrElseUpdate(subject, mutable.Map())
    idSchemaMap.getOrElseUpdate(id, getSchemaByIdFromRegistry(id))
  }

  override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = {
    val response = restService.getVersion(subject, version)
    val id       = response.getId.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def getLatestSchemaMetadata(subject: String): SchemaMetadata = synchronized {
    val response = restService.getLatestVersion(subject)
    val id       = response.getId.intValue
    val version  = response.getVersion.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def updateCompatibility(subject: String, compatibility: String): String = {
    val response: ConfigUpdateRequest = restService.updateCompatibility(compatibility, subject)
    response.getCompatibilityLevel
  }

  override def getCompatibility(subject: String): String = {
    val response: Config = restService.getConfig(subject)
    response.getCompatibilityLevel
  }

  override def testCompatibility(subject: String, schema: Schema): Boolean =
    restService.testCompatibility(schema.toString(), subject, "latest")

  override def register(subject: String, schema: Schema): Int = synchronized {
    val schemaIdMap: mutable.Map[Schema, Integer] =
      schemaCache.getOrElseUpdate(subject, mutable.Map())

    val id = schemaIdMap.getOrElse(
      schema, {
        if (schemaIdMap.size >= identityMapCapacity)
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        val id: Integer = new Integer(registerAndGetId(subject, schema))
        schemaIdMap.put(schema, id)
        idCache(null).put(id, schema)
        id
      }
    )
    id.intValue()
  }
}