为 s3 kafka connect sink 实现自定义分区

Implementing custom partitions for s3 kafka connect sink

我想实现自定义 s3 分区程序 class 以包含一些 avro 消息字段和一些额外的逻辑来生成输出 s3 路径前缀

项目是kotlin的,这是我的class:

package co.kafkaProcessor.connect

import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class MachineAwareHourlyPartitioner<T> : TimeBasedPartitioner<T>() {
    private val log: Logger = LoggerFactory.getLogger(MachineAwareHourlyPartitioner::class.java)
    private lateinit var environmentName: String

    override fun configure(config: MutableMap<String, Any>?) {
        super.configure(config)
        environmentName = config!!["environment.prefix"] as String
    }

    private fun encodedPartitionForTimestamp(sinkRecord: SinkRecord, timestamp: Long?): String? {
        // Our custom logic goes here
    }
}

起初我尝试通过创建自定义 shadowJar 任务来生成 Jar 文件:

tasks {
    withType<ShadowJar> {
        mergeServiceFiles()
        append("META-INF/spring.handlers")
        append("META-INF/spring.schemas")
        append("META-INF/spring.tooling")
        transform(PropertiesFileTransformer::class.java) {
            paths = listOf("META-INF/spring.factories")
            mergeStrategy = "append"
        }
    }

    // Custom jars for kafka connect
    create<ShadowJar>("kafkaConnectUtilsJar") {
        archiveClassifier.set("connect-utils")
        include("co/kafkaProcessor/connect/**")
        include("co/kafkaProcessor/serializer/**")
        from(project.sourceSets.main.get().output)
        configurations = listOf(project.configurations.runtimeClasspath.get())
    }
}

但是 jar -tvf filename.jar 表明它只包含我自己的代码,而 kafka 连接失败 java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.TimeBasedPartitioner。 我认为您不应该在自定义 jar 中包含 kakfa 连接代码,还因为如果我尝试使用 TimeBasedPartitioner 配置任务,它会起作用,因此 class 可用。

然后我尝试通过将自定义 jar 定义更改为以下内容来包含存储分区程序:

tasks {
    withType<ShadowJar> {
        mergeServiceFiles()
        append("META-INF/spring.handlers")
        append("META-INF/spring.schemas")
        append("META-INF/spring.tooling")
        transform(PropertiesFileTransformer::class.java) {
            paths = listOf("META-INF/spring.factories")
            mergeStrategy = "append"
        }
    }

    // Custom jars for kafka connect
    create<ShadowJar>("kafkaConnectUtilsJar") {
        archiveClassifier.set("connect-utils")
        dependencies {
            include(dependency("io.confluent:kafka-connect-storage-partitioner:10.2.4"))
        }
        from(project.sourceSets.main.get().output)
        configurations = listOf(project.configurations.runtimeClasspath.get())
    }
}

不幸的是,这包括我所有的应用程序代码,但我可以看到分区程序包含在 jar 文件中。

Kafka 连接现在失败并出现此错误:

java.lang.ClassCastException: class co.kafkaProcessor.connect.MachineAwareHourlyPartitioner cannot be cast to class io.confluent.connect.storage.partitioner.Partitioner (co.kafkaProcessor.connect.MachineAwareHourlyPartitioner is in unnamed module of loader 'app'; io.confluent.connect.storage.partitioner.Partitioner is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @63a6dffd)
        at io.confluent.connect.s3.S3SinkTask.newPartitioner(S3SinkTask.java:196)
        at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:117)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

更新:我还尝试通过重写 public 方法 encodePartition 来更改函数的重写方式,但这并没有改变。

我也试过添加这样的测试(希望应该尝试投射到 Partitioner`:

val partitioner = MachineAwareHourlyPartitioner<String>()
val implementedPartitioner = partitioner as Partitioner<String>

没有失败

我能够通过将我的 jar 文件(没有任何包含的依赖项)添加到 s3 连接器目录中来让分区程序工作:

/usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib/

我不确定这是否与避免来自不同插件的库相互干扰的插件隔离有关,但在我最初的尝试中,我的插件位于主 class 路径 /usr/share/java/kafka/ 我认为每个插件都可以使用它

作为一个额外的细节,我们还使用另一个自定义 class 来覆盖在 s3 连接器文件夹中不起作用的 avro TopicNameStrategy,我不得不将 jar 也复制到/usr/share/java/kafka/ 目录来解决这个问题,不确定为什么一个在全局文件夹中工作而另一个不工作