使用 ksqldb-server 在 kafka 中创建 table
Create table in kafka using ksqldb-server
我正在尝试使用以下代码(bash 脚本)通过其 REST 接口使用 (Confluent) ksqldb-server 创建一个 kafka table:
KSQLDB_COMMAND="CREATE TABLE sample_table \
(xkey VARCHAR, \
xdata VARCHAR) \
WITH (KAFKA_TOPIC=\'sample-topic\', \
VALUE_FORMAT=\'JSON\', \
KEY=\'xkey\'); "
COMMAND="curl -X 'POST' '$KSQLDB_SERVER' \
-H 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
-d '{ \"ksql\": \"$KSQLDB_COMMAND\" }' "
eval $COMMAND
返回以下错误输出消息:
{"@type":"statement_error","error_code":40001,"message":"Failed to prepare statement: Invalid config variable(s) in the WITH clause: KEY","statementText":"CREATE TABLE sample_table (xkey VARCHAR, xdata VARCHAR) WITH (KAFKA_TOPIC='sample-topic', VALUE_FORMAT='JSON', KEY='xkey');","entities":[]}%
该错误表明实际语句中存在错误,尤其是 KEY 属性。
我可以使用 REST 接口获取基本命令(“LIST STREAMS”等),但无法创建 tables,所以我认为这是 KSQL 语句或我如何创建bash 命令(在“COMMAND”变量中)。
感谢任何帮助。
我花了相当多的时间进行实验并使这个简单的示例起作用(我最初的尝试需要太多 bash 变量替换才能实现 useful/maintainable,所以这个版本被简化了很多).我还发现 KSQLDB table 名称必须遵循 table 名称的常规 SQL 命名约定(即字母、下划线等...但没有连字符,这在我原来的问题中造成了一堆错误......我应该更仔细地阅读文档)。
以下工作(您可能需要更改您的 KSQLDB 服务器地址)...并且只需进行最小的更改,几乎可以执行任何 KSQLDB 命令:
####
# NOTE: table MUST be alpha (underscores are OK)... hyphens are not allowed
####
KSQLDB_SERVER="http://localhost:8088/ksql"
KSQLDB_TABLE="some_table"
KSQLDB_TOPIC="some_topic"
VALUE_FORMAT="JSON"
FMT="{ \"ksql\": \"CREATE TABLE %s (key VARCHAR PRIMARY KEY, data VARCHAR) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT='%s');\" }"
JSON_DATA=$(printf "$FMT" "$KSQLDB_TABLE" "$KSQLDB_TOPIC" "$VALUE_FORMAT")
curl -X "POST" "$KSQLDB_SERVER" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d "$JSON_DATA"
我正在尝试使用以下代码(bash 脚本)通过其 REST 接口使用 (Confluent) ksqldb-server 创建一个 kafka table:
KSQLDB_COMMAND="CREATE TABLE sample_table \
(xkey VARCHAR, \
xdata VARCHAR) \
WITH (KAFKA_TOPIC=\'sample-topic\', \
VALUE_FORMAT=\'JSON\', \
KEY=\'xkey\'); "
COMMAND="curl -X 'POST' '$KSQLDB_SERVER' \
-H 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
-d '{ \"ksql\": \"$KSQLDB_COMMAND\" }' "
eval $COMMAND
返回以下错误输出消息:
{"@type":"statement_error","error_code":40001,"message":"Failed to prepare statement: Invalid config variable(s) in the WITH clause: KEY","statementText":"CREATE TABLE sample_table (xkey VARCHAR, xdata VARCHAR) WITH (KAFKA_TOPIC='sample-topic', VALUE_FORMAT='JSON', KEY='xkey');","entities":[]}%
该错误表明实际语句中存在错误,尤其是 KEY 属性。
我可以使用 REST 接口获取基本命令(“LIST STREAMS”等),但无法创建 tables,所以我认为这是 KSQL 语句或我如何创建bash 命令(在“COMMAND”变量中)。
感谢任何帮助。
我花了相当多的时间进行实验并使这个简单的示例起作用(我最初的尝试需要太多 bash 变量替换才能实现 useful/maintainable,所以这个版本被简化了很多).我还发现 KSQLDB table 名称必须遵循 table 名称的常规 SQL 命名约定(即字母、下划线等...但没有连字符,这在我原来的问题中造成了一堆错误......我应该更仔细地阅读文档)。
以下工作(您可能需要更改您的 KSQLDB 服务器地址)...并且只需进行最小的更改,几乎可以执行任何 KSQLDB 命令:
####
# NOTE: table MUST be alpha (underscores are OK)... hyphens are not allowed
####
KSQLDB_SERVER="http://localhost:8088/ksql"
KSQLDB_TABLE="some_table"
KSQLDB_TOPIC="some_topic"
VALUE_FORMAT="JSON"
FMT="{ \"ksql\": \"CREATE TABLE %s (key VARCHAR PRIMARY KEY, data VARCHAR) WITH (KAFKA_TOPIC='%s', VALUE_FORMAT='%s');\" }"
JSON_DATA=$(printf "$FMT" "$KSQLDB_TABLE" "$KSQLDB_TOPIC" "$VALUE_FORMAT")
curl -X "POST" "$KSQLDB_SERVER" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d "$JSON_DATA"