Apache Pulsar:LocalRunner 中的访问状态存储不起作用

Apache Pulsar: Access state storage in LocalRunner not working

我正在尝试实现一个简单的 Apache Pulsar 函数并在 LocalRunner 模式下访问状态 API,但它不起作用。

pom.xml 片段

<dependencies>
    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client-original</artifactId>
    <version>2.9.1</version>
  </dependency>
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-functions-local-runner-original</artifactId>
      <version>2.9.1</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.13.1</version>
    </dependency>
  </dependencies>

函数class

public class TestFunction implements Function<String, String> {

  public String process(String input, Context context) throws Exception {
    System.out.println(">>> GOT input "+input);
    context.incrCounter("counter",1); //--> try to access state
    return input;
  }
}

主要

public class Main {
  public static final String BROKER_URL = "pulsar://localhost:6650";
  public static final String TOPIC_IN = "test-topic-input";
  public static final String TOPIC_OUT = "test-topic-output";

  public static void main(String[] args) throws Exception {

    FunctionConfig functionConfig = FunctionConfig
        .builder()
        .className(TestFunction.class.getName())
        .inputs(Collections.singleton(TOPIC_IN))
        .name("Test Function")
        .runtime(Runtime.JAVA)
        .subName("Test Function Sub")
        .build();

    LocalRunner localRunner = LocalRunner.builder()
        .brokerServiceUrl(BROKER_URL)
        .stateStorageServiceUrl("bk://127.0.0.1:4181")
        .functionConfig(functionConfig)
        .build();
    localRunner.start(false);

    PulsarClient client = PulsarClient.builder().serviceUrl(BROKER_URL).build();
    Producer<String> producer = client.newProducer(Schema.STRING).topic(TOPIC_IN).create();

    producer.send("Hello World!");
    System.out.println(">>> PRODUCER SENT");
  }

我是本地 docker 容器中的 运行 Pulsar(Win10 上的 Docker 桌面),开始时是这样的:

Docker 容器

docker run -it 
-p 6650:6650  
-p 8080:8080 
-p 4181:4181 #I added that port to expose the bookkeeper, otherwise the function will hang and don't do anything
--mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf 
apachepulsar/pulsar:2.9.1 bin/pulsar standalone

当我启动应用程序时,控制台会显示这些日志:

2022-01-07T12:39:48,757+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,863+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:48,970+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds
2022-01-07T12:39:49,076+0100 [public/default/Test Function-0] WARN  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Encountered issue Invalid stream name : public_default on fetching state stable metadata, re-attempting in 100 milliseconds

... and it goes on an on ...

Pulsar 日志显示:

2022-01-07T12:13:48,882+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

2022-01-07T12:13:48,989+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

2022-01-07T12:13:49,097+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

2022-01-07T12:13:49,207+0000 [grpc-default-executor-0] ERROR org.apache.bookkeeper.stream.storage.impl.metadata.RootRangeStoreImpl - Invalid stream name Test Function

... goes on and on ...

我做错了什么?

问题在于您为函数选择的名称“测试函数”。由于其中有一个 space,当 Pulsar 的状态存储使用该名称作为内部存储流时,这会导致稍后在 Pulsar 的状态存储中出现问题。

如果您删除 space 并改用“TestFunction”,它将正常工作。我刚才已经亲自证实了这一点。

2022-02-07T11:09:04,916-0800 [main] WARN  com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
>>> PRODUCER SENT
2022-02-07T11:09:05,267-0800 [public/default/TestFunction-0] INFO  org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl - Opening state table for function public/default/TestFunction
2022-02-07T11:09:05,279-0800 [client-scheduler-OrderedScheduler-7-0] INFO  org.apache.bookkeeper.clients.SimpleStorageClientImpl - Retrieved table properties for table public_default/TestFunction : stream_id: 1024

2022-02-07T11:09:05,527-0800 [pulsar-client-io-1-2] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test-topic-input][Test Function Sub] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
>>> GOT input Hello World!