通过 Fabric8io kubernetes-client 加载 yaml Kafka 配置时出现 JsonMappingException

JsonMappingException while loading yaml Kafka configuration via Fabric8io kubernetes-client

我在使用 fabric8io kubernetes-client 时遇到问题。

我想要的是:在 Kubernetes 中使用 Strimzi 运算符创建 Kafka 集群。 如果我使用 CLI 和 kubectl 从 Strimzi quickstart guide 开始执行所有步骤,那一切都很好。

但是当我从 Java 代码中使用 kubernetes-client:5.2.1 库加载 yaml 资源时,出现异常:

io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
  at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
  at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:53)
  at io.fabric8.kubernetes.client.utils.Serialization.unmarshal(Serialization.java:140)
  at io.fabric8.kubernetes.client.utils.Serialization.unmarshal(Serialization.java:101)
  at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.<init>(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:272)
  at io.fabric8.kubernetes.client.dsl.internal.NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.<init>(NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java:252)
  at io.fabric8.kubernetes.client.DefaultKubernetesClient.<init>(DefaultKubernetesClient.java:175)
  at io.fabric8.kubernetes.client.DefaultKubernetesClient.load(DefaultKubernetesClient.java:175)
  at app.test.main(test.java:38)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No resource type found for:kafka.strimzi.io/v1#Kafka
 at [Source: (String)"{"apiVersion":"kafka.strimzi.io/v1","kind":"Kafka","metadata":{"name":"my-cluster"},"spec":{"kafka":{"replicas":1,"listeners":[{"name":"plain","port":9092,"type":"internal","tls":false},{"name":"tls","port":9093,"type":"internal","tls":true,"authentication":{"type":"tls"}},{"name":"external","port":9094,"type":"nodeport","tls":false}],"storage":{"type":"jbod","volumes":[{"id":0,"type":"persistent-claim","size":"1Gi","deleteClaim":false}]},"config":{"offsets.topic.replication.factor":1,"transacti"[truncated 226 chars]; line: 1, column: 726]

加载此文件时出现问题:yaml config

我像这样加载资源:

 KubernetesClient client = new DefaultKubernetesClient();
InputStream is = ...;
client.load(is).inNamespace("my_namespace").createOrReplace();

请帮帮我!

P.S:对不起我的英语。

我来自 Fabric8 团队。 Kafka 是自定义资源,这意味着它的模型未在 KubernetesClient 中注册,因此这就是您面临来自 KubenetesClient 的 No resource type found for:kafka.strimzi.io/v1#Kafka 错误的原因。 KubernetesClient 提供了两种处理自定义资源的方法:

  1. 无类型API - 将自定义资源用作原始哈希图
  2. Typed API - 为 CustomResource 类型提供 POJO

我将提供使用这两个 API 加载您的 Kafka yaml 片段的示例。

无类型API:

对于 Typeless API,您需要提供一个 CustomResourceDefinitionContext,一个包含 CustomResource 组、版本、种类、复数等详细信息的对象。它看起来像这样: KafkaLoadTypeless.java

try (KubernetesClient client = new DefaultKubernetesClient()) {
    CustomResourceDefinitionContext context = new CustomResourceDefinitionContext.Builder()
            .withScope("Namespaced")
            .withVersion("v1beta2")
            .withGroup("kafka.strimzi.io")
            .withPlural("kafkas")
            .build();

    InputStream is = KafkaLoadTypeles.class.getResourceAsStream("/test-kafka.yml");

    Map<String, Object> createdKafka = client.customResource(context).inNamespace("default").createOrReplace(is);
} catch (IOException ioException) {
    ioException.printStackTrace();
}

类型API:

对于类型化的 API,您需要为 Kafka 资源提供类型。您可以在 Strimzi API 依赖项中找到这些类型。我必须在我的项目中添加此依赖项才能使用 Kafka 类:

        <dependency>
            <groupId>io.strimzi</groupId>
            <artifactId>api</artifactId>
            <version>0.22.0</version>
            <exclusions>
                <exclusion>
                    <groupId>io.fabric8</groupId>
                    <artifactId>kubernetes-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

添加这个之后,我可以像这样使用 Typed API: KafkaLoadTyped.java

try (KubernetesClient client = new DefaultKubernetesClient()) {
    MixedOperation<Kafka, KafkaList, Resource<Kafka>> kafkaClient = client.customResources(Kafka.class, KafkaList.class);

    InputStream is = KafkaLoadTyped.class.getResourceAsStream("/test-kafka.yml");
    Kafka myClusterkafka = kafkaClient.load(is).get();
    kafkaClient.inNamespace("default").createOrReplace(myClusterkafka);
}