有没有办法通过 spring-boot 访问通过 KSQL (kafka) 创建的 table?
Is there a way to access a table created via KSQL (kafka) through spring-boot?
我是卡夫卡世界的新手,我真的被困在这里了。因此,我们将不胜感激。
我使用以下 KSQL 语句从 kafka 流中创建了一个 table:
CREATE TABLE calc AS
SELECT id, datetime, count(*)
FROM streamA
GROUP BY id, datetime
HAVING count(*) = total;
其中 "streamA" 是由 "topicA"[=15 创建的流=]
我目前正在使用:
- Java8,
- Spring Boot v2.2.9
我的 pom.xml 看起来像:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Packaging -->
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<properties>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<!-- Versioning -->
<groupId>some.name</groupId>
<artifactId>kafka.project</artifactId>
<version>2020.2.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.9.RELEASE</version>
<relativePath />
</parent>
<!-- Meta-data -->
<name>[${project.artifactId}]</name>
<description>Kafka Project</description>
<!-- Dependencies -->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Build settings -->
<build>
<!-- Plugins -->
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
那么,有两个问题:
- 有没有办法通过 Kafka Streams API 访问 table?
- 我可以通过我的应用程序而不是 KSQL 做类似的事情(例如创建 table)吗?
提前致谢
更新
感谢 Shrey Jakhmola () 的建议,但我有一个需要定期访问的大型数据集。我不认为这个解决方案是理想的。
@Joshua Oliphant,是的,这个 table 是由从主题创建的流生成的。
- Is there any way to access that table via Kafka Streams API?
Table calc
将由名为 CALC
的更新日志主题支持。如果需要,您可以在应用程序中自由使用此主题。使用标准消费者或 Kafka Streams。
但是,如果您只想查询 table 的当前状态,那么您可以使用 ksqlDB 的 pull queries 来实现。这些允许您从 ksqlDB 构建的 table 中拉回行。功能是基本的,因为它不是 ksqlDB 提供的核心 streamingSQL 的一部分,但满足一些 use-cases.
如果您还需要其他东西,那么还有其他选择可供您选择:
- 您可以将结果泵入您选择的更传统的 sql 系统,例如postgres,并查询。 (你可以使用ksql的
CREATE SINK CONNECTOR
将数据导出到postgres)。
- 您可以使用标准 Kafka 客户端在您自己的应用中使用数据。 (虽然这只有在您的应用程序的每个实例都可以保存 table 中的所有数据时才有效)。
- 您可以在您的应用程序中使用 Kafka Streams 来使用 table。这样做的好处是您的应用程序的多个实例可以聚集在一起,因此每个实例只使用 table 的一部分数据。然后,您可能想利用 Kafka Streams Interactive Queries 访问 table.ation 的当前状态,将加载
- Could I do something similar (e.g. creating that table) through my application instead of KSQL?
如果你想把 ksqlDB 排除在等式之外,那么是的,ksqlDB 在内部使用 KAfka 流,所以你可以用 ksql 做任何事情DB,你也可以直接用Kafka Streams做。
SQL 赞:
CREATE TABLE calc AS
SELECT id, datetime, count(*)
FROM streamA
GROUP BY id, datetime
HAVING count(*) = total;
会映射到类似(粗略的代码):
StreamsBuilder builder = new StreamsBuilder();
builder
.stream("streamA", Consumed.with(<appropriate serde>))
.groupBy(<a mapper that returns id and datetime as new key>)
.count()
.filter(<filter>);
.toStream()
.to("CALC");
new KafkaStreams(builder.build(), props, clients).start();
我是卡夫卡世界的新手,我真的被困在这里了。因此,我们将不胜感激。
我使用以下 KSQL 语句从 kafka 流中创建了一个 table:
CREATE TABLE calc AS
SELECT id, datetime, count(*)
FROM streamA
GROUP BY id, datetime
HAVING count(*) = total;
其中 "streamA" 是由 "topicA"[=15 创建的流=]
我目前正在使用:
- Java8,
- Spring Boot v2.2.9
我的 pom.xml 看起来像:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Packaging -->
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<properties>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<!-- Versioning -->
<groupId>some.name</groupId>
<artifactId>kafka.project</artifactId>
<version>2020.2.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.9.RELEASE</version>
<relativePath />
</parent>
<!-- Meta-data -->
<name>[${project.artifactId}]</name>
<description>Kafka Project</description>
<!-- Dependencies -->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Build settings -->
<build>
<!-- Plugins -->
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
那么,有两个问题:
- 有没有办法通过 Kafka Streams API 访问 table?
- 我可以通过我的应用程序而不是 KSQL 做类似的事情(例如创建 table)吗?
提前致谢
更新
感谢 Shrey Jakhmola (
@Joshua Oliphant,是的,这个 table 是由从主题创建的流生成的。
- Is there any way to access that table via Kafka Streams API?
Table calc
将由名为 CALC
的更新日志主题支持。如果需要,您可以在应用程序中自由使用此主题。使用标准消费者或 Kafka Streams。
但是,如果您只想查询 table 的当前状态,那么您可以使用 ksqlDB 的 pull queries 来实现。这些允许您从 ksqlDB 构建的 table 中拉回行。功能是基本的,因为它不是 ksqlDB 提供的核心 streamingSQL 的一部分,但满足一些 use-cases.
如果您还需要其他东西,那么还有其他选择可供您选择:
- 您可以将结果泵入您选择的更传统的 sql 系统,例如postgres,并查询。 (你可以使用ksql的
CREATE SINK CONNECTOR
将数据导出到postgres)。 - 您可以使用标准 Kafka 客户端在您自己的应用中使用数据。 (虽然这只有在您的应用程序的每个实例都可以保存 table 中的所有数据时才有效)。
- 您可以在您的应用程序中使用 Kafka Streams 来使用 table。这样做的好处是您的应用程序的多个实例可以聚集在一起,因此每个实例只使用 table 的一部分数据。然后,您可能想利用 Kafka Streams Interactive Queries 访问 table.ation 的当前状态,将加载
- Could I do something similar (e.g. creating that table) through my application instead of KSQL?
如果你想把 ksqlDB 排除在等式之外,那么是的,ksqlDB 在内部使用 KAfka 流,所以你可以用 ksql 做任何事情DB,你也可以直接用Kafka Streams做。
SQL 赞:
CREATE TABLE calc AS
SELECT id, datetime, count(*)
FROM streamA
GROUP BY id, datetime
HAVING count(*) = total;
会映射到类似(粗略的代码):
StreamsBuilder builder = new StreamsBuilder();
builder
.stream("streamA", Consumed.with(<appropriate serde>))
.groupBy(<a mapper that returns id and datetime as new key>)
.count()
.filter(<filter>);
.toStream()
.to("CALC");
new KafkaStreams(builder.build(), props, clients).start();