Kafka connect topic消息写入sink database前修改

Kafka connect topic message modification before writing to sink database

例如,我在源和目标之间设置了 Kafka 连接

我在 mysql 中有一个 table,我想将其发送到 mongodb,我已将 mysql 设置为源,将 mongodb 设置为接收器并且工作正常。

在我的 mysql table 中有一个名为 'download_link' 的专栏,其中我有一个 pdf s3 下载 link。现在,当我设置 Kafka 时,这个 link 将变为 mongodb 但我需要的是,在我收到来自 mysql 源的消息后,我想执行一个 python 代码来下载pdf 文件并从中提取文本,所以当我的数据进入 mongodb 时。它不应该是 link 而是提取的文本。有没有可能做这样的事情?

有人可以提供一些资源吗?

I want to execute a python code ...

Kafka Connect 无法做到这一点。

既然你有, refer post -

您将 运行 您的 Python 流处理器在源连接器之后,将数据发送到新主题,然后在这些

上使用连接接收器

请记住,Kafka 消息具有最大大小,因此提取大型 PDF 文本 blob 并将数据保留在主题中可能不是最好的主意。相反,您可以让 MongoDB writer 应用程序在写入数据库之前下载 PDF,但如前所述,您需要编写 Java 才能使用 Kafka Connect。否则,您将剩下其他 Python 个从 Kafka 消费并写入 Mongo

的进程