Message Hub 上的 Kafka Streams KTable 配置错误
Kafka Streams KTable configuration error on Message Hub
此问题现已在消息中心得到解决
我在 Kafka 中创建 KTable 时遇到了一些问题。我是卡夫卡的新手,这可能是我问题的根源,但我想我还是可以在这里问。我有一个项目,我想通过计算它们的总出现次数来跟踪不同的 ID。我在 IBM Cloud 上使用 Message Hub 来管理我的主题,到目前为止效果非常好。
我在 Message Hub 上有一个主题,可以生成类似 {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}
的消息,目前唯一相关的关键字是 ID。
我的 Kafka 代码以及 Streams 配置如下所示:
import org.apache.kafka.streams.StreamsConfig;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> Kstreams = builder.stream(myTopic);
KTable<String, Long> eventCount = Kstreams
.flatMapValues(value -> getID(value)) //function that retrieves the ID
.groupBy((key, value) -> value)
.count();
当我 运行 代码时,出现以下错误:
Exception in thread "KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition.
其次是:
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid configuration: {segment.index.bytes=52428800, segment.bytes=52428800, cleanup.policy=delete, segment.ms=600000}. Only allowed configs: [retention.ms, cleanup.policy]
我不知道为什么会出现这个错误,也不知道该怎么办。我构建 KStream 和 KTable 的方式是否不正确?或者也许是 bluemix 上的消息中心?
已解决:
在我标记为正确的答案下方的评论中添加摘录。事实证明我的 StreamsConfig 没有问题,并且(目前)Message Hub 方面存在问题,但有一个解决方法:
事实证明,Message Hub 在使用 Kafka Streams 1.1 创建重新分区主题时存在问题。在我们进行修复时,您需要手动创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题 (myTopic) 一样多的分区,并将保留时间设置为最大值。修复后我会post另一个评论
非常感谢您的帮助!
Message Hub 有一些 restrictions 创建主题时可以使用的配置。
从您收到的 PolicyViolationException 来看,您的 Streams 应用程序似乎试图使用一些我们不允许的配置:
- segment.index.bytes
- segment.bytes
- segment.ms
我猜你在 Streams 配置的某处设置了它们,应该将它们删除。
请注意,您还需要在您的配置中将 StreamsConfig.REPLICATION_FACTOR_CONFIG
设置为 3 才能使用我们的 docs.
中提到的 Message Hub
此问题现已在消息中心得到解决
我在 Kafka 中创建 KTable 时遇到了一些问题。我是卡夫卡的新手,这可能是我问题的根源,但我想我还是可以在这里问。我有一个项目,我想通过计算它们的总出现次数来跟踪不同的 ID。我在 IBM Cloud 上使用 Message Hub 来管理我的主题,到目前为止效果非常好。
我在 Message Hub 上有一个主题,可以生成类似 {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}
的消息,目前唯一相关的关键字是 ID。
我的 Kafka 代码以及 Streams 配置如下所示:
import org.apache.kafka.streams.StreamsConfig;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> Kstreams = builder.stream(myTopic);
KTable<String, Long> eventCount = Kstreams
.flatMapValues(value -> getID(value)) //function that retrieves the ID
.groupBy((key, value) -> value)
.count();
当我 运行 代码时,出现以下错误:
Exception in thread "KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition.
其次是:
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid configuration: {segment.index.bytes=52428800, segment.bytes=52428800, cleanup.policy=delete, segment.ms=600000}. Only allowed configs: [retention.ms, cleanup.policy]
我不知道为什么会出现这个错误,也不知道该怎么办。我构建 KStream 和 KTable 的方式是否不正确?或者也许是 bluemix 上的消息中心?
已解决:
在我标记为正确的答案下方的评论中添加摘录。事实证明我的 StreamsConfig 没有问题,并且(目前)Message Hub 方面存在问题,但有一个解决方法:
事实证明,Message Hub 在使用 Kafka Streams 1.1 创建重新分区主题时存在问题。在我们进行修复时,您需要手动创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题 (myTopic) 一样多的分区,并将保留时间设置为最大值。修复后我会post另一个评论
非常感谢您的帮助!
Message Hub 有一些 restrictions 创建主题时可以使用的配置。
从您收到的 PolicyViolationException 来看,您的 Streams 应用程序似乎试图使用一些我们不允许的配置:
- segment.index.bytes
- segment.bytes
- segment.ms
我猜你在 Streams 配置的某处设置了它们,应该将它们删除。
请注意,您还需要在您的配置中将 StreamsConfig.REPLICATION_FACTOR_CONFIG
设置为 3 才能使用我们的 docs.