GlobalKTable - StreamsException:遇到与任何全局状态存储无关的主题分区
GlobalKTable - StreamsException: Encountered a topic-partition not associated with any global state store
我正在尝试使用 Kafka-streams 从流创建 GlobalKTable,但在调用 streams.start()
:
时出现异常
org.apache.kafka.streams.errors.StreamsException: Encountered a topic-partition not associated with any global state store
我的代码是:
private final KafkaStreams streams;
private final StoreQueryParameters<ReadOnlyKeyValueStore<LocalDate, String>> bankHolidayTypesSqp = StoreQueryParameters.fromNameAndType("bank_holiday_type_store"
,QueryableStoreTypes.<LocalDate, String>keyValueStore());
private final ReadOnlyKeyValueStore<LocalDate, String> localBankHolidayTypeStore;
private void instantiateKafka()
{
// configure Kafka
StreamsBuilder builder = new StreamsBuilder();
// CustomSerializableSerde is just a generic serializer that uses standard java Base64 encoding on any object that implements Serializable - it works in a dummy application I've tested, so I don't think it's the problem
addGlobalTableToStreamsBuilder(builder, bankHolidayTypeTopic,"bank_holiday_type_store", new CustomSerializableSerde<LocalDate>(),Serdes.String());
streams = createStreams("localhost:9092", "C:\Kafka\tmp\kafka-streams-global-tables",MyClass.class.getName(),builder);
streams.start(); // hangs until the global table is built
}
public static <Tk extends Serializable,Tv extends Serializable> StreamsBuilder addGlobalTableToStreamsBuilder(StreamsBuilder builder, String topic
, String store, Serde<Tk> keySerializer, Serde<Tv> valueSerializer)
{
builder.globalTable(topic, Materialized.<Tk, Tv, KeyValueStore<Bytes, byte[]>>as(store)
.withKeySerde(keySerializer)
.withValueSerde(valueSerializer));
return builder;
}
public static KafkaStreams createStreams(final String bootstrapServers, final String stateDir, String clientID, StreamsBuilder finishedBuilder)
{
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationName");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientID);
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
// Set to earliest so we don't miss any data that arrived in the topics before the process started
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaStreams(finishedBuilder.build(), streamsConfiguration);
}
制作人:
Producer<LocalDate,String> bankHolidayTypeProducer = MyClass<LocalDate,String>createProducer("localhost:9092", BankHolidayData.class.getName()
, CustomSerializer.class.getName(), StringSerializer.class.getName());
//...
HashMap<LocalDate, String> bankHolidaysData = populateBankHolidayMap();
for (LocalDate bhDay : bankHolidaysData.keySet())
{
bankHolidayTypeProducer.send(new ProducerRecord<>(bankHolidayTypeTopic, bhDay, bankHolidaysData.get(bhDay)));
}
public static <Tk extends Serializable, Tv extends Serializable> Producer<Tk,Tv> createProducer(String bootstrapServers
, String clientID, String keySerializerClassName, String valueSerializerClassName)
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClassName);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClassName);
return new KafkaProducer<>(props);
}
我的主题是生产者在第一次生产时自动创建的,并且在 GlobalKTable 尝试读取它时将一直存在。这是问题吗?在设置主题时是否需要做一些事情来告诉 Kafka 它将被 Streams GlobalKTable 使用?
主题的结构(显然)发生了一些变化,这意味着需要重置 Streams。为此,您可以使用应用程序 Conduktor,或位于 http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool.
的重置工具
我正在尝试使用 Kafka-streams 从流创建 GlobalKTable,但在调用 streams.start()
:
org.apache.kafka.streams.errors.StreamsException: Encountered a topic-partition not associated with any global state store
我的代码是:
private final KafkaStreams streams;
private final StoreQueryParameters<ReadOnlyKeyValueStore<LocalDate, String>> bankHolidayTypesSqp = StoreQueryParameters.fromNameAndType("bank_holiday_type_store"
,QueryableStoreTypes.<LocalDate, String>keyValueStore());
private final ReadOnlyKeyValueStore<LocalDate, String> localBankHolidayTypeStore;
private void instantiateKafka()
{
// configure Kafka
StreamsBuilder builder = new StreamsBuilder();
// CustomSerializableSerde is just a generic serializer that uses standard java Base64 encoding on any object that implements Serializable - it works in a dummy application I've tested, so I don't think it's the problem
addGlobalTableToStreamsBuilder(builder, bankHolidayTypeTopic,"bank_holiday_type_store", new CustomSerializableSerde<LocalDate>(),Serdes.String());
streams = createStreams("localhost:9092", "C:\Kafka\tmp\kafka-streams-global-tables",MyClass.class.getName(),builder);
streams.start(); // hangs until the global table is built
}
public static <Tk extends Serializable,Tv extends Serializable> StreamsBuilder addGlobalTableToStreamsBuilder(StreamsBuilder builder, String topic
, String store, Serde<Tk> keySerializer, Serde<Tv> valueSerializer)
{
builder.globalTable(topic, Materialized.<Tk, Tv, KeyValueStore<Bytes, byte[]>>as(store)
.withKeySerde(keySerializer)
.withValueSerde(valueSerializer));
return builder;
}
public static KafkaStreams createStreams(final String bootstrapServers, final String stateDir, String clientID, StreamsBuilder finishedBuilder)
{
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationName");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, clientID);
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
// Set to earliest so we don't miss any data that arrived in the topics before the process started
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaStreams(finishedBuilder.build(), streamsConfiguration);
}
制作人:
Producer<LocalDate,String> bankHolidayTypeProducer = MyClass<LocalDate,String>createProducer("localhost:9092", BankHolidayData.class.getName()
, CustomSerializer.class.getName(), StringSerializer.class.getName());
//...
HashMap<LocalDate, String> bankHolidaysData = populateBankHolidayMap();
for (LocalDate bhDay : bankHolidaysData.keySet())
{
bankHolidayTypeProducer.send(new ProducerRecord<>(bankHolidayTypeTopic, bhDay, bankHolidaysData.get(bhDay)));
}
public static <Tk extends Serializable, Tv extends Serializable> Producer<Tk,Tv> createProducer(String bootstrapServers
, String clientID, String keySerializerClassName, String valueSerializerClassName)
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClassName);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClassName);
return new KafkaProducer<>(props);
}
我的主题是生产者在第一次生产时自动创建的,并且在 GlobalKTable 尝试读取它时将一直存在。这是问题吗?在设置主题时是否需要做一些事情来告诉 Kafka 它将被 Streams GlobalKTable 使用?
主题的结构(显然)发生了一些变化,这意味着需要重置 Streams。为此,您可以使用应用程序 Conduktor,或位于 http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool.
的重置工具