Confluentinc 连接器。一个弹性搜索索引中的多个连接器。以特定文档类型发布
Confluentinc connector. Multiple connectors in one elasticsearch index. Publish in specific document type
我的 Confluentinc 连接器有问题。
创建连接器时需要指定主题(elasticsearch 索引)和类型(ES 中的文档类型)。
{
"name": "test-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test",
"key.ignore": "false",
"schema.ignore": "false",
"connection.url": "http://elastic:9200",
"type.name": "type1",
"name": "elasticsearch-sink"
}
}
我想在同一个索引(kafka主题)中发布,但发布到不同的类型,可以吗?
我尝试创建多个连接器,但问题是每个连接器都使用消息,因为它是同一个主题。
我试过动态创建特定类型的连接器,在那里发布,然后删除连接器。但有时它删除得太早并且并非所有消息都被消耗(没有出现在弹性中)。此外,当我删除连接器并使用其他文档类型创建另一个连接器时,这个新连接器会消耗一些旧消息。
有没有人知道如何管理它?
每个连接器可以将消息路由到一种类型。您可以使用 Single Message Transform 将消息路由到不同的索引,但这不是您想要的。
我推荐的是使用流处理将消息拆分为不同的主题。然后,每个主题都由不同的连接器根据需要流式传输到相同的索引但类型不同。
要进行流处理,您可以使用 Kafka Streams、Spark Streaming 等。还有 KSQL,它可以让您执行以下操作:
CREATE STREAM FOO_TYPE_A AS SELECT * FROM FOO WHERE TYPE='A';
CREATE STREAM FOO_TYPE_B AS SELECT * FROM FOO WHERE TYPE='B';
CREATE STREAM FOO_TYPE_C AS SELECT * FROM FOO WHERE TYPE='C';
然后您将创建三个连接器的三个主题(FOO_TYPE_A
、FOO_TYPE_B
、FOO_TYPE_C
),流式传输到索引 FOO
,但类型不同。
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。
我找到了解决方案,不幸的是它已被弃用。如果你知道更好的东西,请告诉我。
来自官方文档:
topic.index.map
This option is now deprecated. A future version may remove it completely. Please use single message transforms, such as RegexRouter, to map topic names to index names.
A map from Kafka topic name to the destination Elasticsearch index, represented as a list of topic:index pairs.
Type: list
Default: ""
Importance: low
所以我创建了这样的连接器:
{
"name": "test-connector-old",
"config": {
.....
"topics": "old",
"topic.index.map": "old:test",
....
}
}
现在我可以推送到主题 "old" 并且它将索引 elasticsearch "test" index
然后我创建了更多连接器,并使用 "topic.index.map":“TOPIC_NAME:test”,我可以在同一索引上索引不同类型
在未来的版本中,它将是主题 => 索引。 Confluent 团队,请不要删除 topic.index.map,或为这种情况寻找更好的解决方案
谢谢!
我的 Confluentinc 连接器有问题。
创建连接器时需要指定主题(elasticsearch 索引)和类型(ES 中的文档类型)。
{
"name": "test-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test",
"key.ignore": "false",
"schema.ignore": "false",
"connection.url": "http://elastic:9200",
"type.name": "type1",
"name": "elasticsearch-sink"
}
}
我想在同一个索引(kafka主题)中发布,但发布到不同的类型,可以吗?
我尝试创建多个连接器,但问题是每个连接器都使用消息,因为它是同一个主题。
我试过动态创建特定类型的连接器,在那里发布,然后删除连接器。但有时它删除得太早并且并非所有消息都被消耗(没有出现在弹性中)。此外,当我删除连接器并使用其他文档类型创建另一个连接器时,这个新连接器会消耗一些旧消息。
有没有人知道如何管理它?
每个连接器可以将消息路由到一种类型。您可以使用 Single Message Transform 将消息路由到不同的索引,但这不是您想要的。
我推荐的是使用流处理将消息拆分为不同的主题。然后,每个主题都由不同的连接器根据需要流式传输到相同的索引但类型不同。
要进行流处理,您可以使用 Kafka Streams、Spark Streaming 等。还有 KSQL,它可以让您执行以下操作:
CREATE STREAM FOO_TYPE_A AS SELECT * FROM FOO WHERE TYPE='A';
CREATE STREAM FOO_TYPE_B AS SELECT * FROM FOO WHERE TYPE='B';
CREATE STREAM FOO_TYPE_C AS SELECT * FROM FOO WHERE TYPE='C';
然后您将创建三个连接器的三个主题(FOO_TYPE_A
、FOO_TYPE_B
、FOO_TYPE_C
),流式传输到索引 FOO
,但类型不同。
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。
我找到了解决方案,不幸的是它已被弃用。如果你知道更好的东西,请告诉我。
来自官方文档:
topic.index.map
This option is now deprecated. A future version may remove it completely. Please use single message transforms, such as RegexRouter, to map topic names to index names.
A map from Kafka topic name to the destination Elasticsearch index, represented as a list of topic:index pairs.
Type: list
Default: ""
Importance: low
所以我创建了这样的连接器:
{
"name": "test-connector-old",
"config": {
.....
"topics": "old",
"topic.index.map": "old:test",
....
}
}
现在我可以推送到主题 "old" 并且它将索引 elasticsearch "test" index
然后我创建了更多连接器,并使用 "topic.index.map":“TOPIC_NAME:test”,我可以在同一索引上索引不同类型
在未来的版本中,它将是主题 => 索引。 Confluent 团队,请不要删除 topic.index.map,或为这种情况寻找更好的解决方案 谢谢!