pyflink 使用 kafka_source_ddl 发生错误
pyflink use kafka_source_ddl an error occurred
问题:
- 我想用pyflink读取kafka msg
- 运行 命令
./bin/flink run -m 10.0.24.13:8081 -py /usr/local/project/cdn_flink/cdn_demo.py
,显示错误:
File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/table/table.py", line 1108, in execute_insert
File "/usr/local/flink-1.14.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:88)
at org.apache.flink.table.factories.TableFactoryUtil.lambda$findAndCreateTableSink[=11=](TableFactoryUtil.java:116)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:116)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:379)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun.apply(PlannerBase.scala:182)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun.apply(PlannerBase.scala:182)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:574)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Could not load service provider for table factories.
at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:212)
我的环境
flink-1.14.2
kafka_2.12-3.0.0
zookeeper-3.7.0
apache-flink 1.14.2
python 3.6.8
所有进程都是运行ning:
- 7600 独立会话集群入口点
- 13316 TaskManagerRunner#flink
- 23878 QuorumPeerMain # zk
- 15705 ConsoleProducer
- 29721 Jps
- 31454卡夫卡
我的代码
- cdn_connector_ddl.py
# --coding=utf8 --
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'cdn_access_log',
'properties.bootstrap.servers' = '10.0.24.13:9091',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.field-delimiter' = ','
)
"""
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/hive?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
'connector.table' = 'cdn_access_statistic',
'connector.username' = 'hive',
'connector.password' = 'hive1234',
'connector.write.flush.interval' = '1s'
)
- cdn_demo.py
# --coding=utf8 --
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, TableConfig
from cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl
def start():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# add jar
_path = "file:////usr/local/lib64/python3.6/site-packages/pyflink/lib"
env.add_jars(os.path.join(_path, 'mysql-connector-java-8.0.27.jar'))
env.add_jars(os.path.join(_path, 'flink-sql-connector-kafka_2.12-1.14.2.jar'))
env.add_jars(os.path.join(_path, 'kafka-clients-3.0.0.jar'))
env.add_jars(os.path.join(_path, 'flink-csv-1.14.2-sql-jar.jar'))
env.add_jars(os.path.join(_path, 'flink-connector-kafka_2.12-1.14.2.jar'))
# t_env = StreamTableEnvironment.create(env, TableConfig())
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
# set source table
t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(mysql_sink_ddl)
t_env.from_path("cdn_access_log") \
.select("uuid, "
"client_ip as province, "
"response_size, request_time")\
.group_by("province")\
.select(
"province, count(uuid) as access_count, "
"sum(response_size) as total_download, "
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.execute_insert("cdn_access_statistic")
t_env.execute("cdn_access_log")
if __name__=='__main__':
start()
- 我不知道怎么解决,也许使用旧的flink版本?请帮助我,谢谢
错误提示找不到suitable table sink
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.
两个思路供参考
- 查看是否加载了flink-connector-jdbc.jar,我看你刚刚加载了mysql-connector-java-8.0.27.jar
- 检查jdbc连接器选项,不要使用connector.xxx,也许你可以参考https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/
问题:
- 我想用pyflink读取kafka msg
- 运行 命令
./bin/flink run -m 10.0.24.13:8081 -py /usr/local/project/cdn_flink/cdn_demo.py
,显示错误:
File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/table/table.py", line 1108, in execute_insert
File "/usr/local/flink-1.14.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:88)
at org.apache.flink.table.factories.TableFactoryUtil.lambda$findAndCreateTableSink[=11=](TableFactoryUtil.java:116)
at java.util.Optional.orElseGet(Optional.java:267)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:116)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:379)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun.apply(PlannerBase.scala:182)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun.apply(PlannerBase.scala:182)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:574)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Could not load service provider for table factories.
at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:212)
我的环境
flink-1.14.2
kafka_2.12-3.0.0
zookeeper-3.7.0
apache-flink 1.14.2
python 3.6.8
所有进程都是运行ning:
- 7600 独立会话集群入口点
- 13316 TaskManagerRunner#flink
- 23878 QuorumPeerMain # zk
- 15705 ConsoleProducer
- 29721 Jps
- 31454卡夫卡
我的代码
- cdn_connector_ddl.py
# --coding=utf8 --
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'cdn_access_log',
'properties.bootstrap.servers' = '10.0.24.13:9091',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.field-delimiter' = ','
)
"""
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/hive?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
'connector.table' = 'cdn_access_statistic',
'connector.username' = 'hive',
'connector.password' = 'hive1234',
'connector.write.flush.interval' = '1s'
)
- cdn_demo.py
# --coding=utf8 --
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, TableConfig
from cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl
def start():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# add jar
_path = "file:////usr/local/lib64/python3.6/site-packages/pyflink/lib"
env.add_jars(os.path.join(_path, 'mysql-connector-java-8.0.27.jar'))
env.add_jars(os.path.join(_path, 'flink-sql-connector-kafka_2.12-1.14.2.jar'))
env.add_jars(os.path.join(_path, 'kafka-clients-3.0.0.jar'))
env.add_jars(os.path.join(_path, 'flink-csv-1.14.2-sql-jar.jar'))
env.add_jars(os.path.join(_path, 'flink-connector-kafka_2.12-1.14.2.jar'))
# t_env = StreamTableEnvironment.create(env, TableConfig())
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
# set source table
t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(mysql_sink_ddl)
t_env.from_path("cdn_access_log") \
.select("uuid, "
"client_ip as province, "
"response_size, request_time")\
.group_by("province")\
.select(
"province, count(uuid) as access_count, "
"sum(response_size) as total_download, "
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.execute_insert("cdn_access_statistic")
t_env.execute("cdn_access_log")
if __name__=='__main__':
start()
- 我不知道怎么解决,也许使用旧的flink版本?请帮助我,谢谢
错误提示找不到suitable table sink
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.
两个思路供参考
- 查看是否加载了flink-connector-jdbc.jar,我看你刚刚加载了mysql-connector-java-8.0.27.jar
- 检查jdbc连接器选项,不要使用connector.xxx,也许你可以参考https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/