无法使用 flink 读取运动流,获取 SdkClientException:无法执行 HTTP 请求:当前令牌 (VALUE_STRING)

cannot read kinesis stream with flink, getting SdkClientException: Unable to execute HTTP request: Current token (VALUE_STRING)

我正在尝试从 flink 读取 kinesis(实际上 运行 使用 kinesa 的本地模拟)。这是我的消费者配置:

val consumerConfig = new Properties()
  consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
  consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
  consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
  consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

val kinesis =
   env.addSource(new FlinkKinesisConsumer[String](
     inputStreamName, new SimpleStringSchema(), consumerConfig))
     .print()

但是在流上放置记录时:

 aws --endpoint-url=http://localhost:4567 kinesis put-record --data "hello" --partition-key 0 --stream-name ExampleInputStream

我收到以下错误:

org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Current token (VALUE_STRING) not VALUE_EMBEDDED_OBJECT, can not access as binary
 at [Source: (org.apache.flink.kinesis.shaded.com.amazonaws.event.ResponseProgressInputStream); line: -1, column: 304]
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access0(AmazonHttpClient.java:698)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
    at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:247)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:401)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:244)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: Current token (VALUE_STRING) not VALUE_EMBEDDED_OBJECT, can not access as binary
 at [Source: (org.apache.flink.kinesis.shaded.com.amazonaws.event.ResponseProgressInputStream); line: -1, column: 304]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
    at com.fasterxml.jackson.dataformat.cbor.CBORParser.getBinaryValue(CBORParser.java:1660)
    at com.fasterxml.jackson.core.JsonParser.getBinaryValue(JsonParser.java:1484)
    at org.apache.flink.kinesis.shaded.com.amazonaws.transform.SimpleTypeCborUnmarshallers$ByteBufferCborUnmarshaller.unmarshall(SimpleTypeCborUnmarshallers.java:198)
    at org.apache.flink.kinesis.shaded.com.amazonaws.transform.SimpleTypeCborUnmarshallers$ByteBufferCborUnmarshaller.unmarshall(SimpleTypeCborUnmarshallers.java:196)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:61)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:29)
    at org.apache.flink.kinesis.shaded.com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:92)
    at org.apache.flink.kinesis.shaded.com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:46)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:53)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:29)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:118)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
    ... 20 more

从堆栈跟踪来看,当您的记录作为简单字符串写入时,Flink kinesis 似乎需要 Cbor。

似乎 包含一些关于如何使您的本地设置与消费者端保持一致的提示。

在我的例子中,我添加了这个系统属性:

System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true")
  System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true")
  System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")