Apache Pulsar:LocalRunner 中的访问状态存储不起作用
Apache Pulsar: Access state storage in LocalRunner not working
我正在尝试实现一个简单的 Apache Pulsar 函数并在 LocalRunner 模式下访问状态 API,但它不起作用。
pom.xml 片段
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.1</version>
</dependency>
</dependencies>
函数class
public class TestFunction implements Function<String, String> {
public String process(String input, Context context) throws Exception {
System.out.println(">>> GOT input "+input);
context.incrCounter("counter",1); //--> try to access state
return input;
}
}
主要
public class Main {
public static final String BROKER_URL = "pulsar://localhost:6650";
public static final String TOPIC_IN = "test-topic-input";
public static final String TOPIC_OUT = "test-topic-output";
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = FunctionConfig
.builder()
.className(TestFunction.class.getName())
.inputs(Collections.singleton(TOPIC_IN))
.name("Test Function")
.runtime(Runtime.JAVA)
.subName("Test Function Sub")
.build();
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl(BROKER_URL)
.stateStorageServiceUrl("bk://127.0.0.1:4181")
.functionConfig(functionConfig)
.build();
localRunner.start(false);
PulsarClient client = PulsarClient.builder().serviceUrl(BROKER_URL).build();
Producer<String> producer = client.newProducer(Schema.STRING).topic(TOPIC_IN).create();
producer.send("Hello World!");
System.out.println(">>> PRODUCER SENT");
}
我是本地 docker 容器中的 运行 Pulsar(Win10 上的 Docker 桌面),开始时是这样的:
Docker 容器
docker run -it
-p 6650:6650
-p 8080:8080
-p 4181:4181 #I added that port to expose the bookkeeper, otherwise the function will hang and don't do anything
--mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf
apachepulsar/pulsar:2.9.1 bin/pulsar standalone
当我启动应用程序时,控制台会显示这些日志:
2022-01-07T12:39:48,757+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,863+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,970+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:49,076+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
... and it goes on an on ...
Pulsar 日志显示:
2022-01-07T12:13:48,882+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:48,989+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,097+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,207+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
... goes on and on ...
我做错了什么?
问题在于您为函数选择的名称“测试函数”。由于其中有一个 space,当 Pulsar 的状态存储使用该名称作为内部存储流时,这会导致稍后在 Pulsar 的状态存储中出现问题。
如果您删除 space 并改用“TestFunction”,它将正常工作。我刚才已经亲自证实了这一点。
2022-02-07T11:09:04,916-0800 [main] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
>>> PRODUCER SENT
2022-02-07T11:09:05,267-0800 [public/default/TestFunction-0] INFO org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Opening state table for function public/default/TestFunction
2022-02-07T11:09:05,279-0800 [client-scheduler-OrderedScheduler-7-0] INFO org.apache.bookkeeper.clients.SimpleStorageClientImpl - Retrieved table properties for table public_default/TestFunction : stream_id: 1024
2022-02-07T11:09:05,527-0800 [pulsar-client-io-1-2] INFO org.apache.pulsar.client.impl.ConsumerImpl - [test-topic-input][Test Function Sub] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
>>> GOT input Hello World!
我正在尝试实现一个简单的 Apache Pulsar 函数并在 LocalRunner 模式下访问状态 API,但它不起作用。
pom.xml 片段
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner-original</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.1</version>
</dependency>
</dependencies>
函数class
public class TestFunction implements Function<String, String> {
public String process(String input, Context context) throws Exception {
System.out.println(">>> GOT input "+input);
context.incrCounter("counter",1); //--> try to access state
return input;
}
}
主要
public class Main {
public static final String BROKER_URL = "pulsar://localhost:6650";
public static final String TOPIC_IN = "test-topic-input";
public static final String TOPIC_OUT = "test-topic-output";
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = FunctionConfig
.builder()
.className(TestFunction.class.getName())
.inputs(Collections.singleton(TOPIC_IN))
.name("Test Function")
.runtime(Runtime.JAVA)
.subName("Test Function Sub")
.build();
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl(BROKER_URL)
.stateStorageServiceUrl("bk://127.0.0.1:4181")
.functionConfig(functionConfig)
.build();
localRunner.start(false);
PulsarClient client = PulsarClient.builder().serviceUrl(BROKER_URL).build();
Producer<String> producer = client.newProducer(Schema.STRING).topic(TOPIC_IN).create();
producer.send("Hello World!");
System.out.println(">>> PRODUCER SENT");
}
我是本地 docker 容器中的 运行 Pulsar(Win10 上的 Docker 桌面),开始时是这样的:
Docker 容器
docker run -it
-p 6650:6650
-p 8080:8080
-p 4181:4181 #I added that port to expose the bookkeeper, otherwise the function will hang and don't do anything
--mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf
apachepulsar/pulsar:2.9.1 bin/pulsar standalone
当我启动应用程序时,控制台会显示这些日志:
2022-01-07T12:39:48,757+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,863+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,970+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:49,076+0100 [public/default/Test Function-0] WARN org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
... and it goes on an on ...
Pulsar 日志显示:
2022-01-07T12:13:48,882+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:48,989+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,097+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
2022-01-07T12:13:49,207+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function
... goes on and on ...
我做错了什么?
问题在于您为函数选择的名称“测试函数”。由于其中有一个 space,当 Pulsar 的状态存储使用该名称作为内部存储流时,这会导致稍后在 Pulsar 的状态存储中出现问题。
如果您删除 space 并改用“TestFunction”,它将正常工作。我刚才已经亲自证实了这一点。
2022-02-07T11:09:04,916-0800 [main] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
>>> PRODUCER SENT
2022-02-07T11:09:05,267-0800 [public/default/TestFunction-0] INFO org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Opening state table for function public/default/TestFunction
2022-02-07T11:09:05,279-0800 [client-scheduler-OrderedScheduler-7-0] INFO org.apache.bookkeeper.clients.SimpleStorageClientImpl - Retrieved table properties for table public_default/TestFunction : stream_id: 1024
2022-02-07T11:09:05,527-0800 [pulsar-client-io-1-2] INFO org.apache.pulsar.client.impl.ConsumerImpl - [test-topic-input][Test Function Sub] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
>>> GOT input Hello World!