Streamsets 在尝试解析有效​​的 JSON 时出现此错误

Streamsets gives this error trying to parse a valid JSON

我正在为 project.It 设置流集,以 Kafka 消费者为源。它适用于较小的消息,但当消息较大时会抛出此错误。

com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191

我已经将最大对象长度(字符)设置为 1000000,将 parser.limit 属性 设置为 10335040。 我无法弄清楚这个问题。

不适用

完整的堆栈跟踪是

KAFKA_37 - Cannot parse record from message 'rms-search-data::0::61950': com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
com.streamsets.pipeline.api.base.OnRecordErrorException: KAFKA_37 - Cannot parse record from message 'rms-search-data::0::61950': com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
 at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:265)
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:224)
    at com.streamsets.pipeline.stage.origin.kafka.StandaloneKafkaSource.produce(StandaloneKafkaSource.java:86)
    at com.streamsets.pipeline.api.base.configurablestage.DSource.produce(DSource.java:38)
    at com.streamsets.datacollector.runner.StageRuntime.lambda$execute(StageRuntime.java:283)
    at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:235)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:298)
    at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:219)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.processPipe(ProductionPipelineRunner.java:810)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:554)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:383)
    at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:527)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:109)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75)
    at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:703)
    at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start(AsyncRunner.java:151)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call[=12=](SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call[=12=](SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
 at [Source: com.streamsets.pipeline.api.ext.io.OverrunReader@39c37ffd; line: 1, column: 4191]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:483)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parseName2(ReaderBasedJsonParser.java:1716)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parseName(ReaderBasedJsonParser.java:1700)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:493)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:126)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl$MapDeserializer.deserialize(OverrunJsonObjectReaderImpl.java:122)
    at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:223)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3786)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2115)
    at com.fasterxml.jackson.core.JsonParser.readValueAs(JsonParser.java:1627)
    at com.streamsets.datacollector.json.JsonObjectReaderImpl.readObjectFromStream(JsonObjectReaderImpl.java:199)
    at com.streamsets.datacollector.json.OverrunJsonObjectReaderImpl.readObjectFromStream(OverrunJsonObjectReaderImpl.java:196)
    at com.streamsets.datacollector.json.JsonObjectReaderImpl.read(JsonObjectReaderImpl.java:111)
    at com.streamsets.pipeline.lib.parser.json.JsonCharDataParser.parse(JsonCharDataParser.java:70)
    at com.streamsets.pipeline.lib.parser.WrapperDataParserFactory$WrapperDataParser.lambda$parse[=12=](WrapperDataParserFactory.java:105)
    at com.streamsets.pipeline.api.impl.CreateByRef.call(CreateByRef.java:40)
    at com.streamsets.pipeline.lib.parser.WrapperDataParserFactory$WrapperDataParser.parse(WrapperDataParserFactory.java:105)
    at com.streamsets.pipeline.stage.origin.kafka.BaseKafkaSource.processKafkaMessageDefault(BaseKafkaSource.java:244)
    ... 29 more

这个json失败了:-

{"payload":{"data":{"aIndex":"application0502","aType":"application","pIndex": "profile000","pType":"profile","da":{"clientId":"168613","clientType":"1","statusDataList" :{"68348":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68348","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68349":{"PAYMENT_STATUS": 1,"UNIQUE_KEY":"168613_68349","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null, "STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68351":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68351","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS" :[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103 ","REQ_POSTED_BY":"76866550"},"68365":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68365","CURR_STATUS":"1949", "CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[], "CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"}, "68366":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68366","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE" :1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68367":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":空,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":空,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY": "76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68369":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019- 05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68370":{"PAYMENT_STATUS":1,"UNIQUE_KEY": "168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0, "OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68371":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS" :"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY": "76866550"},"68372":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null, "STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550 ","CURR_STATUS_DATE":"2 019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"}},"recruiterId":"76866550","isActivity": false},"ignoreParamsForIndexing":{"statusDetailsForAsyncActions":{"clientId":"168613","statusId":"1949","subStatusId":null,"assessmentTestId" :"","feedbackFormIds":[],"hiring managers":[],"isBillingEnabled":null,"isOfferGenerationEnabled":null,"statusDataJson":{"assessment" :{"action":1,"sendToNew":false,"resendToAll":false,"statusId":"1949","subStatusId":null},"CURR_STATUS_DATE" :"2019-05-21 17:18:59"}},"projectDetailsForAsyncActions":{"projectId":"15463"}},"optn":{"_routing":"168613"} ,"action":22,"activityField":"STATUS_CHANGED"},"dataArray":null,"retryCount":3,"additionalHeaders":{},"routingKey":"168613","topic":"rms-search-data"},"headers":{"AppId":123,"SystemId":"1234","X-TRANSACTION-ID" :"27108593751"}}

这 Json 给出成功:-

{"payload":{"data":{"aIndex":"application0502","aType":"application","pIndex": "profile000","pType":"profile","da":{"clientId":"168613","clientType":"1","statusDataList" :{"68348":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68348","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68349":{"PAYMENT_STATUS": 1,"UNIQUE_KEY":"168613_68349","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null, "STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68351":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68351","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS" :[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103 ","REQ_POSTED_BY":"76866550"},"68365":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68365","CURR_STATUS":"1949", "CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[], "CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:18:59","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"}, "68366":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68366","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE" :1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68367":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":空,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":空,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY": "76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68369":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019- 05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68370":{"PAYMENT_STATUS":1,"UNIQUE_KEY": "168613_68367","CURR_STATUS":"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0, "OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY":"76866550"},"68371":{"PAYMENT_STATUS":1,"UNIQUE_KEY":"168613_68367","CURR_STATUS" :"1949","CURR_SUB_STATUS":null,"STATUS_VALUE":1949,"SUB_STATUS_VALUE":null,"STATUS_STATE":0,"OWNERS_BY_CURR_STATUS":[],"ADDITIONAL_OWNERS":[],"CURR_STATUS_UPDATEDBY":"76866550","CURR_STATUS_DATE":"2019-05-21 17:19:00","REQ_EMPLOYERID":"4103","REQ_POSTED_BY": "76866550"}},"recruiterId":"76866550","isActivity":false},"ignoreParamsForIndexing":{"statusDetailsForAsyncActions":{"clientId":"168613","statusId":"1949","subStatusId":null,"assessmentTestId":"","feedbackFormIds":[],"hiring managers":[],"isBillingEnabled":零l,"isOfferGenerationEnabled":null,"statusDataJson":{"assessment":{"action":1,"sendToNew":false,"resendToAll":false,"statusId":"1949","subStatusId":null},"CURR_STATUS_DATE":"2019-05-21 17:18:59"}},"projectDetailsForAsyncActions":{"projectId" :"15463"}},"optn":{"_routing":"168613"},"action":22,"activityField":"STATUS_CHANGED"},"dataArray":空,"retryCount":3,"additionalHeaders":{},"routingKey":"168613","topic":"rms-search-data"},"headers":{"AppId":123,"SystemId":"1234","X-TRANSACTION-ID":"27108593751"}}

我写了一个快速管道来尝试复制它,但它按我预期的那样工作。我必须像您一样在 Kafka 消费者的数据格式配置中设置 Max Object Length (chars),它读取和解析数据就很好。

检查是否完整地从 Kafka 检索数据:复制管道,将 Kafka 消费者的数据格式更改为 Text,并将输出发送到文件。您应该能够看到是否正在从 Kafka 主题中读取所有数据。可能是Kafka中设置了最大消息大小为4k,导致消息被截断。

另一件要检查的事情是您使用的阶段库是否正确。事实上,正如评论中所解释的那样,这就是解决方法——Deep 使用的是 CDH 2.x 消费者;当他将其更改为 Kafka 0.11.0.0 时,它开始正常工作。