索引超出范围 this.buf.utf8Slice
Out of range index this.buf.utf8Slice
我有一个使用 confluent.io kafka 包的带有 avro 消息的 kafka 流。这适用于 java 应用程序。但我现在正尝试在 javascript.
中阅读这些消息
我一直在尝试使用 kafka-node + avsc 包将消息从缓冲区数组解码为字符串,使用模式。我知道 confluent 将前 5 个字节作为模式 ID 的魔术字节 (0) + 4 个字节。
所以我对缓冲区进行切片以删除那些字节并尝试将其发送到 avsc 进行解码。但是我得到一个错误
return this.buf.utf8Slice(pos, pos + len);
RangeError: out of range index
at RangeError (native)
at Tap.readString (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\utils.js:452:19)
at StringType._read (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\types.js:612:58)
同时尝试手动解码这会留下很多非 utf8 字符,这样我就会丢失数据。
示例代码:
consumer.on('message', function(message) {
var val = message.value.slice(4);
sails.log.info('val buffer', val, val.length);
sails.log.info('hex',val.toString('hex'));
var type = avro.parse({"type":"record",
"name":"StatusEvent",
"fields":[{"name":"ApplicationUUID","type":"string"},
{"name":"StatusUUID","type":"string"},
{"name":"Name","type":"string"},
{"name":"ChangedBy","type":"string"},
{"name":"ChangedByUUID","type":"string"},
{"name":"ChangedAt","type":"long"}]
});
var decodedValue = type.fromBuffer(val);
sails.log.info('Decoded', decodedValue);
});
您的 slice(4)
应该是 slice(5)
(否则您只会跳过 5 个 header 字节中的 4 个)。您也许还能找到有用的信息 here.
我有一个使用 confluent.io kafka 包的带有 avro 消息的 kafka 流。这适用于 java 应用程序。但我现在正尝试在 javascript.
中阅读这些消息我一直在尝试使用 kafka-node + avsc 包将消息从缓冲区数组解码为字符串,使用模式。我知道 confluent 将前 5 个字节作为模式 ID 的魔术字节 (0) + 4 个字节。
所以我对缓冲区进行切片以删除那些字节并尝试将其发送到 avsc 进行解码。但是我得到一个错误
return this.buf.utf8Slice(pos, pos + len);
RangeError: out of range index at RangeError (native) at Tap.readString (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\utils.js:452:19) at StringType._read (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\types.js:612:58)
同时尝试手动解码这会留下很多非 utf8 字符,这样我就会丢失数据。
示例代码:
consumer.on('message', function(message) {
var val = message.value.slice(4);
sails.log.info('val buffer', val, val.length);
sails.log.info('hex',val.toString('hex'));
var type = avro.parse({"type":"record",
"name":"StatusEvent",
"fields":[{"name":"ApplicationUUID","type":"string"},
{"name":"StatusUUID","type":"string"},
{"name":"Name","type":"string"},
{"name":"ChangedBy","type":"string"},
{"name":"ChangedByUUID","type":"string"},
{"name":"ChangedAt","type":"long"}]
});
var decodedValue = type.fromBuffer(val);
sails.log.info('Decoded', decodedValue);
});
您的 slice(4)
应该是 slice(5)
(否则您只会跳过 5 个 header 字节中的 4 个)。您也许还能找到有用的信息 here.