pyflink 使用 kafka_source_ddl 发生错误

pyflink use kafka_source_ddl an error occurred

问题:

 File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/table/table.py", line 1108, in execute_insert
  File "/usr/local/flink-1.14.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:88)
        at org.apache.flink.table.factories.TableFactoryUtil.lambda$findAndCreateTableSink[=11=](TableFactoryUtil.java:116)
        at java.util.Optional.orElseGet(Optional.java:267)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:116)
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:379)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun.apply(PlannerBase.scala:182)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun.apply(PlannerBase.scala:182)
        at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
        at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:574)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Could not load service provider for table factories.
        at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:212)

我的环境

我的代码

# --coding=utf8 --
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
 uuid VARCHAR,
 client_ip VARCHAR,
 request_time BIGINT,
 response_size BIGINT
) WITH (
 'connector' = 'kafka',
 'topic' = 'cdn_access_log',
 'properties.bootstrap.servers' = '10.0.24.13:9091',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv',
 'csv.field-delimiter' = ','
)
"""
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
 province VARCHAR,
 access_count BIGINT,
 total_download BIGINT,
 download_speed DOUBLE
 ) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://localhost:3306/hive?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
 'connector.table' = 'cdn_access_statistic',
 'connector.username' = 'hive',
 'connector.password' = 'hive1234',
 'connector.write.flush.interval' = '1s'
)

# --coding=utf8 --
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, TableConfig
from cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl


def start():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    # add jar
    _path = "file:////usr/local/lib64/python3.6/site-packages/pyflink/lib"
    env.add_jars(os.path.join(_path, 'mysql-connector-java-8.0.27.jar'))
    env.add_jars(os.path.join(_path, 'flink-sql-connector-kafka_2.12-1.14.2.jar'))
    env.add_jars(os.path.join(_path, 'kafka-clients-3.0.0.jar'))
    env.add_jars(os.path.join(_path, 'flink-csv-1.14.2-sql-jar.jar'))
    env.add_jars(os.path.join(_path, 'flink-connector-kafka_2.12-1.14.2.jar'))
    # t_env = StreamTableEnvironment.create(env, TableConfig())
    t_env = StreamTableEnvironment.create(
        env,
        environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
    # set source table
    t_env.execute_sql(kafka_source_ddl)
    t_env.execute_sql(mysql_sink_ddl)
   
    t_env.from_path("cdn_access_log") \
        .select("uuid, "
            "client_ip as province, "
            "response_size, request_time")\
    .group_by("province")\
        .select(
               "province, count(uuid) as access_count, "
               "sum(response_size) as total_download,  "
               "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
       .execute_insert("cdn_access_statistic")

    t_env.execute("cdn_access_log")

if __name__=='__main__':
    start()

错误提示找不到suitable table sink

pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.

两个思路供参考

  1. 查看是否加载了flink-connector-jdbc.jar,我看你刚刚加载了mysql-connector-java-8.0.27.jar
  2. 检查jdbc连接器选项,不要使用connector.xxx,也许你可以参考https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/