Flink - 产生运动不工作

Flink - producing to kinesis not working

我正在尝试 运行 一个简单的程序,该程序从一个运动流中读取数据,进行简单的转换,然后将结果写入另一个运动流。

运行本地Flink 1.4.0(目前EMR支持的版本,无法升级)

代码如下:

def main(args: Array[String]) {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val consumerConfig = new Properties()
  consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
  consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

  val kinesisMaps = env.addSource(new FlinkKinesisConsumer[String](
    "source-stream", new SimpleStringSchema, consumerConfig))

  val jsonMaps = kinesisMaps.map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, String]] }
  val values = jsonMaps.map(jsonMap => jsonMap("field_name"))

  values.print()

  val producerConfig = new Properties()
  producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")

  val kinesisProducer = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
  kinesisProducer.setFailOnError(true)
  kinesisProducer.setDefaultStream("target-stream")
  kinesisProducer.setDefaultPartition("0")

  values.addSink(kinesisProducer)

  // execute program
  env.execute("Flink Kinesis")
}

如果我注释掉生成代码,程序 运行 会按预期运行并打印正确的值。

一旦我添加生产者代码,我就得到以下异常:

org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:477)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:248)
    at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:608)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:569)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:264)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:210)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

知道这是什么原因吗?

显然,这是 Flink 1.4 中使用的旧版本 Amazon KPL 的问题。

至少有两种可能的解决方案:

  1. 升级到 Flink 1.5 版本。 您仍然可以在 EMR 上使用它,如果您按照此处的说明安装它,在自定义 EMR 安装部分下: https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html

  2. 在为 Flink 1.4 构建 Kinesis 连接器时,您可以使用较新的 AWS 依赖项来构建它:我在 pom.xml 中挑选了 aws 依赖项更改1.5 的连接器,并用它们构建了连接器。看起来它正在按预期工作。