kafka 消费者 .net 'Protocol message end-group tag did not match expected tag.'
kafka consumer .net 'Protocol message end-group tag did not match expected tag.'
如您所见,我正在尝试从 kafka 读取数据:
var config = new ConsumerConfig
{
BootstrapServers = ""*******,
GroupId = Guid.NewGuid().ToString(),
AutoOffsetReset = AutoOffsetReset.Earliest
};
MessageParser<AdminIpoChange> parser = new(() => new AdminIpoChange());
using (var consumer = new ConsumerBuilder<Ignore, byte[]>(config).Build())
{
consumer.Subscribe("AdminIpoChange");
while (true)
{
AdminIpoChange item = new AdminIpoChange();
var cr = consumer.Consume();
item = parser.ParseFrom(new ReadOnlySpan<byte>(cr.Message.Value).ToArray());
}
consumer.Close();
}
我正在使用 google protobuf
发送和接收数据。此代码 returns 解析器行中的此错误:
KafkaConsumer.ConsumeAsync: Protocol message end-group tag did not match expected tag.
Google.Protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
at Google.Protobuf.ParsingPrimitivesMessages.CheckLastTagWas(ParserInternalState& state, UInt32 expectedTag)
at Google.Protobuf.ParsingPrimitivesMessages.ReadGroup(ParseContext& ctx, Int32 fieldNumber, UnknownFieldSet set)
at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(ParseContext& ctx)
at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(UnknownFieldSet unknownFields, ParseContext& ctx)
at AdminIpoChange.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 213
at Google.Protobuf.ParsingPrimitivesMessages.ReadRawMessage(ParseContext& ctx, IMessage message)
at Google.Protobuf.CodedInputStream.ReadRawMessage(IMessage message)
at AdminIpoChange.MergeFrom(CodedInputStream input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 188
at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, Byte[] data, Boolean discardUnknownFields, ExtensionRegistry registry)
at Google.Protobuf.MessageParser`1.ParseFrom(Byte[] data)
at infrastructure.Queue.Kafka.KafkaConsumer.ConsumeCarefully[T](Func`2 consumeFunc, String topic, String group) in D:\MofidProject\infrastructure\Queue\Kafka\KafkaConsumer.cs:line 168
D:\MofidProject\mts.consumer.plus\bin\Debug\net6.0\mts.consumer.plus.exe (process 15516) exited with code -1001.
To automatically close the console when debugging stops, enable Tools->Options->Debugging->Automatically close the console when debugging stops.'
已更新:
来自 Kafka 的示例数据:
- {"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}
我的 rlc 模型:
syntax = "proto3";
message AdminIpoChange
{
string Id =1;
string SymbolName =2;
string SymbolIsin =3;
string Date =4;
string Time=5;
double MinPrice =6;
double MaxPrice =7;
int32 Share =8;
bool Show =9;
int32 Operation =10;
string CreateDateTime=11;
enum AdminIpoOperation
{
Add = 0;
Edit = 1;
Delete = 2;
}
}
我的字节数据:
7B 22 53 79 6D 62 6F 6C 4E 61 6D 65 22 3A 22 5C 75 30 36 34 31 5C 75 30 36 32 46 5C 75 30
36 33 31 22 2C 22 53 79 6D 62 6F 6C 49 73 69 6E 22 3A 22 49 52 6F 33 70 7A 41 5A 30 30 30
32 22 2C 22 44 61 74 65 22 3A 22 31 34 30 30 2F 31 32 2F 31 35 22 2C 22 54 69 6D 65 22 3A
22 30 38 3A 30 30 2D 31 32 3A 30 30 22 2C 22 4D 69 6E 50 72 69 63 65 22 3A 31 37 37 32 36
2C 22 4D 61 78 50 72 69 63 65 22 3A 32 31 36 36 36 2C 22 53 68 61 72 65 22 3A 31 30 30 30
2C 22 53 68 6F 77 22 3A 66 61 6C 73 65 2C 22 4F 70 65 72 61 74 69 6F 6E 22 3A 30 2C 22 49
64 22 3A 22 31 30 30 64 38 65 30 62 35 34 31 35 34 65 39 64 39 30 32 30 35 34 62 66 66 31
39 33 65 38 37 35 22 2C 22 43 72 65 61 74 65 44 61 74 65 54 69 6D 65 22 3A 22 32 30 32 32
2D 30 32 2D 32 36 54 30 39 3A 34 37 3A 32 30 2E 30 31 33 34 37 35 37 2B 30 33 3A 33 30 22
7D
数据肯定不是protobuf二进制;字节 0 开始一个字段号为 15 的组;该组内有:
- 字段 4,字符串
- 字段 13,固定 32
- 字段 6,varint
- 字段 12,固定 32
- 字段 6,varint
在此之后(在字节 151 处),遇到字段编号为 6
的 end-group 标记
这里面有很多惊人的地方:
- 您的模式不使用组(事实上,现在很难在文档中找到组的存在),所以...none 这看起来是正确的
- end-group 标记总是需要匹配最后一个 start-group 字段编号,而它不需要
- 单个级别中的字段通常(尽管是“应该”,而不是“必须”)按数字顺序书写
- 您没有声明字段 12 或 13
- 您的字段 6 类型错误 - 我们希望此处为 fixed64,但得到的是 varint
所以:毫无疑问:数据是......不是你所期望的。它肯定不是有效的 protobuf 二进制文件。在不知道这些数据是如何存储的情况下,我们所能做的就是猜测,但凭直觉:让我们尝试将其解码为 UTF8,看看它是什么样的:
{"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}
或(格式化)
{
"SymbolName":"\u0641\u062F\u0631",
"SymbolIsin":"IRo3pzAZ0002",
"Date":"1400/12/15",
"Time":"08:00-12:00",
"MinPrice":17726,
"MaxPrice":21666,
"Share":1000,
"Show":false,
"Operation":0,
"Id":"100d8e0b54154e9d902054bff193e875",
"CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"
}
糟糕!您已将数据写入 JSON,并且您正在尝试将其解码为二进制 protobuf。将其解码为 JSON,你应该没问题。如果这是用 protobuf JSON API 编写的: 用 protobuf JSON API.
解码它
如您所见,我正在尝试从 kafka 读取数据:
var config = new ConsumerConfig
{
BootstrapServers = ""*******,
GroupId = Guid.NewGuid().ToString(),
AutoOffsetReset = AutoOffsetReset.Earliest
};
MessageParser<AdminIpoChange> parser = new(() => new AdminIpoChange());
using (var consumer = new ConsumerBuilder<Ignore, byte[]>(config).Build())
{
consumer.Subscribe("AdminIpoChange");
while (true)
{
AdminIpoChange item = new AdminIpoChange();
var cr = consumer.Consume();
item = parser.ParseFrom(new ReadOnlySpan<byte>(cr.Message.Value).ToArray());
}
consumer.Close();
}
我正在使用 google protobuf
发送和接收数据。此代码 returns 解析器行中的此错误:
KafkaConsumer.ConsumeAsync: Protocol message end-group tag did not match expected tag.
Google.Protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
at Google.Protobuf.ParsingPrimitivesMessages.CheckLastTagWas(ParserInternalState& state, UInt32 expectedTag)
at Google.Protobuf.ParsingPrimitivesMessages.ReadGroup(ParseContext& ctx, Int32 fieldNumber, UnknownFieldSet set)
at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(ParseContext& ctx)
at Google.Protobuf.UnknownFieldSet.MergeFieldFrom(UnknownFieldSet unknownFields, ParseContext& ctx)
at AdminIpoChange.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 213
at Google.Protobuf.ParsingPrimitivesMessages.ReadRawMessage(ParseContext& ctx, IMessage message)
at Google.Protobuf.CodedInputStream.ReadRawMessage(IMessage message)
at AdminIpoChange.MergeFrom(CodedInputStream input) in D:\MofidProject\domain\obj\Debug\net6.0\Protos\Rlc\AdminIpoChange.cs:line 188
at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, Byte[] data, Boolean discardUnknownFields, ExtensionRegistry registry)
at Google.Protobuf.MessageParser`1.ParseFrom(Byte[] data)
at infrastructure.Queue.Kafka.KafkaConsumer.ConsumeCarefully[T](Func`2 consumeFunc, String topic, String group) in D:\MofidProject\infrastructure\Queue\Kafka\KafkaConsumer.cs:line 168
D:\MofidProject\mts.consumer.plus\bin\Debug\net6.0\mts.consumer.plus.exe (process 15516) exited with code -1001.
To automatically close the console when debugging stops, enable Tools->Options->Debugging->Automatically close the console when debugging stops.'
已更新:
来自 Kafka 的示例数据:
- {"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}
我的 rlc 模型:
syntax = "proto3";
message AdminIpoChange
{
string Id =1;
string SymbolName =2;
string SymbolIsin =3;
string Date =4;
string Time=5;
double MinPrice =6;
double MaxPrice =7;
int32 Share =8;
bool Show =9;
int32 Operation =10;
string CreateDateTime=11;
enum AdminIpoOperation
{
Add = 0;
Edit = 1;
Delete = 2;
}
}
我的字节数据:
7B 22 53 79 6D 62 6F 6C 4E 61 6D 65 22 3A 22 5C 75 30 36 34 31 5C 75 30 36 32 46 5C 75 30
36 33 31 22 2C 22 53 79 6D 62 6F 6C 49 73 69 6E 22 3A 22 49 52 6F 33 70 7A 41 5A 30 30 30
32 22 2C 22 44 61 74 65 22 3A 22 31 34 30 30 2F 31 32 2F 31 35 22 2C 22 54 69 6D 65 22 3A
22 30 38 3A 30 30 2D 31 32 3A 30 30 22 2C 22 4D 69 6E 50 72 69 63 65 22 3A 31 37 37 32 36
2C 22 4D 61 78 50 72 69 63 65 22 3A 32 31 36 36 36 2C 22 53 68 61 72 65 22 3A 31 30 30 30
2C 22 53 68 6F 77 22 3A 66 61 6C 73 65 2C 22 4F 70 65 72 61 74 69 6F 6E 22 3A 30 2C 22 49
64 22 3A 22 31 30 30 64 38 65 30 62 35 34 31 35 34 65 39 64 39 30 32 30 35 34 62 66 66 31
39 33 65 38 37 35 22 2C 22 43 72 65 61 74 65 44 61 74 65 54 69 6D 65 22 3A 22 32 30 32 32
2D 30 32 2D 32 36 54 30 39 3A 34 37 3A 32 30 2E 30 31 33 34 37 35 37 2B 30 33 3A 33 30 22
7D
数据肯定不是protobuf二进制;字节 0 开始一个字段号为 15 的组;该组内有:
- 字段 4,字符串
- 字段 13,固定 32
- 字段 6,varint
- 字段 12,固定 32
- 字段 6,varint
在此之后(在字节 151 处),遇到字段编号为 6
的 end-group 标记这里面有很多惊人的地方:
- 您的模式不使用组(事实上,现在很难在文档中找到组的存在),所以...none 这看起来是正确的
- end-group 标记总是需要匹配最后一个 start-group 字段编号,而它不需要
- 单个级别中的字段通常(尽管是“应该”,而不是“必须”)按数字顺序书写
- 您没有声明字段 12 或 13
- 您的字段 6 类型错误 - 我们希望此处为 fixed64,但得到的是 varint
所以:毫无疑问:数据是......不是你所期望的。它肯定不是有效的 protobuf 二进制文件。在不知道这些数据是如何存储的情况下,我们所能做的就是猜测,但凭直觉:让我们尝试将其解码为 UTF8,看看它是什么样的:
{"SymbolName":"\u0641\u062F\u0631","SymbolIsin":"IRo3pzAZ0002","Date":"1400/12/15","Time":"08:00-12:00","MinPrice":17726,"MaxPrice":21666,"Share":1000,"Show":false,"Operation":0,"Id":"100d8e0b54154e9d902054bff193e875","CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"}
或(格式化)
{
"SymbolName":"\u0641\u062F\u0631",
"SymbolIsin":"IRo3pzAZ0002",
"Date":"1400/12/15",
"Time":"08:00-12:00",
"MinPrice":17726,
"MaxPrice":21666,
"Share":1000,
"Show":false,
"Operation":0,
"Id":"100d8e0b54154e9d902054bff193e875",
"CreateDateTime":"2022-02-26T09:47:20.0134757+03:30"
}
糟糕!您已将数据写入 JSON,并且您正在尝试将其解码为二进制 protobuf。将其解码为 JSON,你应该没问题。如果这是用 protobuf JSON API 编写的: 用 protobuf JSON API.
解码它