索引超出范围 this.buf.utf8Slice

Out of range index this.buf.utf8Slice

我有一个使用 confluent.io kafka 包的带有 avro 消息的 kafka 流。这适用于 java 应用程序。但我现在正尝试在 javascript.

中阅读这些消息

我一直在尝试使用 kafka-node + avsc 包将消息从缓冲区数组解码为字符串,使用模式。我知道 confluent 将前 5 个字节作为模式 ID 的魔术字节 (0) + 4 个字节。

所以我对缓冲区进行切片以删除那些字节并尝试将其发送到 avsc 进行解码。但是我得到一个错误

return this.buf.utf8Slice(pos, pos + len);

RangeError: out of range index at RangeError (native) at Tap.readString (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\utils.js:452:19) at StringType._read (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\types.js:612:58)

同时尝试手动解码这会留下很多非 utf8 字符,这样我就会丢失数据。

示例代码:

  consumer.on('message', function(message) {
      var val = message.value.slice(4);
      sails.log.info('val buffer', val, val.length);
      sails.log.info('hex',val.toString('hex'));
      var type = avro.parse({"type":"record",
       "name":"StatusEvent",
        "fields":[{"name":"ApplicationUUID","type":"string"},
        {"name":"StatusUUID","type":"string"},
        {"name":"Name","type":"string"},
        {"name":"ChangedBy","type":"string"},
        {"name":"ChangedByUUID","type":"string"},
        {"name":"ChangedAt","type":"long"}]
      });

      var decodedValue = type.fromBuffer(val);
      sails.log.info('Decoded', decodedValue);
    });

您的 slice(4) 应该是 slice(5)(否则您只会跳过 5 个 header 字节中的 4 个)。您也许还能找到有用的信息 here.