Wso2 流处理器:处理 eventByteBufferQueue 时出错
Wso2 Stream Processor : Error occurred while processing eventByteBufferQueue
我有两个 wso2-am 分析服务器 (2.6.0) 节点,它们是 Wso2 流处理器。我在集群的被动节点上看到以下错误。活动节点很好,我没有看到任何错误。分析结果对正在 API Publisher 或 Store 上查看数据的用户没有影响。但是被动节点有错误。
请告知导致以下问题的原因..
2019-02-26 17:06:09,513] 错误 {org.wso2.carbon.stream.processor.core.ha.tcp.EventSyncServer} - 处理 eventByteBufferQueue null 时发生错误 java.nio.BufferUnderflowException
刚遇到同样的问题,这是我的问题和解决方案。
1)使用WSO2 SP HA部署。
2)当Event进入active节点,根据streaming的source mapping,有些字段为NULL
3)主动节点想把这个事件同步到被动节点
4) 被动节点从'eventByteBufferQueue'中获取事件数据以满足备用接管机制
5) 被动节点无法解析主动节点的数据,报错异常
根本原因是SP默认只支持NULL String,当NULL with LONG, INTEGER..时出现错误。但对我来说,长字段有 NULL 是正常情况,您可以将数据类型更改为字符串。
这是我的解决方案:
org.wso2.carbon.stream.processor.core_2.0.478.jar
添加逻辑以支持 NULL
BinaryMessageConverterUtil.java 用于从活动节点发送事件数据
public final class BinaryMessageConverterUtil {
public static int getSize(Object data) {
if (data instanceof String) {
return 4 + ((String) data).length();
} else if (data instanceof Integer) {
return 4;
} else if (data instanceof Long) {
return 8;
} else if (data instanceof Float) {
return 4;
} else if (data instanceof Double) {
return 8;
} else if (data instanceof Boolean) {
return 1;
} else if (data == null) {
return 0;
}else {
//TODO
return 4;
}
}
public static EventDataMetaInfo getEventMetaInfo(Object data) {
int eventSize;
Attribute.Type attributeType;
if (data instanceof String) {
attributeType = Attribute.Type.STRING;
eventSize = 4 + ((String) data).length();
} else if (data instanceof Integer) {
attributeType = Attribute.Type.INT;
eventSize = 4;
} else if (data instanceof Long) {
attributeType = Attribute.Type.LONG;
eventSize = 8;
} else if (data instanceof Float) {
attributeType = Attribute.Type.FLOAT;
eventSize = 4;
} else if (data instanceof Double) {
attributeType = Attribute.Type.DOUBLE;
eventSize = 8;
} else if (data instanceof Boolean) {
attributeType = Attribute.Type.BOOL;
eventSize = 1;
} else if (data == null){
attributeType = Attribute.Type.OBJECT;
eventSize = 0; //'no content between the HA nodes for NULL fields'
} else {
//TODO
attributeType = Attribute.Type.OBJECT;
eventSize = 1;
}
return new EventDataMetaInfo(eventSize, attributeType);
}
public static void assignData(Object data, ByteBuffer eventDataBuffer) throws IOException {
if (data instanceof String) {
eventDataBuffer.putInt(((String) data).length());
eventDataBuffer.put((((String) data).getBytes(Charset.defaultCharset())));
} else if (data instanceof Integer) {
eventDataBuffer.putInt((Integer) data);
} else if (data instanceof Long) {
eventDataBuffer.putLong((Long) data);
} else if (data instanceof Float) {
eventDataBuffer.putFloat((Float) data);
} else if (data instanceof Double) {
eventDataBuffer.putDouble((Double) data);
} else if (data instanceof Boolean) {
eventDataBuffer.put((byte) (((Boolean) data) ? 1 : 0));
} else if (data == null){
//put nothing into he Buffer
} else {
eventDataBuffer.putInt(0);
}
}
public static String getString(ByteBuf byteBuf, int size) throws UnsupportedEncodingException {
byte[] bytes = new byte[size];
byteBuf.readBytes(bytes);
return new String(bytes, Charset.defaultCharset());
}
public static String getString(ByteBuffer byteBuf, int size) throws UnsupportedEncodingException {
byte[] bytes = new byte[size];
byteBuf.get(bytes);
return new String(bytes, Charset.defaultCharset());
}
}
SiddhiEventConverter.java 用于在被动节点处理事件数据
static Object[] toObjectArray(ByteBuffer byteBuffer,
String[] attributeTypeOrder) throws UnsupportedEncodingException {
if (attributeTypeOrder != null) {
Object[] objects = new Object[attributeTypeOrder.length];
for (int i = 0; i < attributeTypeOrder.length; i++) {
switch (attributeTypeOrder[i]) {
case "INT":
objects[i] = byteBuffer.getInt();
break;
case "LONG":
objects[i] = byteBuffer.getLong();
break;
case "STRING":
int stringSize = byteBuffer.getInt();
if (stringSize == 0) {
objects[i] = null;
} else {
objects[i] = BinaryMessageConverterUtil.getString(byteBuffer, stringSize);
}
break;
case "DOUBLE":
objects[i] = byteBuffer.getDouble();
break;
case "FLOAT":
objects[i] = byteBuffer.getFloat();
break;
case "BOOL":
objects[i] = byteBuffer.get() == 1;
break;
case "OBJECT":
//for NULL fields
objects[i] = null;
break;
default:
// will not occur
}
}
return objects;
} else {
return null;
}
}
我有两个 wso2-am 分析服务器 (2.6.0) 节点,它们是 Wso2 流处理器。我在集群的被动节点上看到以下错误。活动节点很好,我没有看到任何错误。分析结果对正在 API Publisher 或 Store 上查看数据的用户没有影响。但是被动节点有错误。
请告知导致以下问题的原因..
2019-02-26 17:06:09,513] 错误 {org.wso2.carbon.stream.processor.core.ha.tcp.EventSyncServer} - 处理 eventByteBufferQueue null 时发生错误 java.nio.BufferUnderflowException
刚遇到同样的问题,这是我的问题和解决方案。 1)使用WSO2 SP HA部署。 2)当Event进入active节点,根据streaming的source mapping,有些字段为NULL 3)主动节点想把这个事件同步到被动节点 4) 被动节点从'eventByteBufferQueue'中获取事件数据以满足备用接管机制 5) 被动节点无法解析主动节点的数据,报错异常
根本原因是SP默认只支持NULL String,当NULL with LONG, INTEGER..时出现错误。但对我来说,长字段有 NULL 是正常情况,您可以将数据类型更改为字符串。
这是我的解决方案: org.wso2.carbon.stream.processor.core_2.0.478.jar 添加逻辑以支持 NULL BinaryMessageConverterUtil.java 用于从活动节点发送事件数据
public final class BinaryMessageConverterUtil {
public static int getSize(Object data) {
if (data instanceof String) {
return 4 + ((String) data).length();
} else if (data instanceof Integer) {
return 4;
} else if (data instanceof Long) {
return 8;
} else if (data instanceof Float) {
return 4;
} else if (data instanceof Double) {
return 8;
} else if (data instanceof Boolean) {
return 1;
} else if (data == null) {
return 0;
}else {
//TODO
return 4;
}
}
public static EventDataMetaInfo getEventMetaInfo(Object data) {
int eventSize;
Attribute.Type attributeType;
if (data instanceof String) {
attributeType = Attribute.Type.STRING;
eventSize = 4 + ((String) data).length();
} else if (data instanceof Integer) {
attributeType = Attribute.Type.INT;
eventSize = 4;
} else if (data instanceof Long) {
attributeType = Attribute.Type.LONG;
eventSize = 8;
} else if (data instanceof Float) {
attributeType = Attribute.Type.FLOAT;
eventSize = 4;
} else if (data instanceof Double) {
attributeType = Attribute.Type.DOUBLE;
eventSize = 8;
} else if (data instanceof Boolean) {
attributeType = Attribute.Type.BOOL;
eventSize = 1;
} else if (data == null){
attributeType = Attribute.Type.OBJECT;
eventSize = 0; //'no content between the HA nodes for NULL fields'
} else {
//TODO
attributeType = Attribute.Type.OBJECT;
eventSize = 1;
}
return new EventDataMetaInfo(eventSize, attributeType);
}
public static void assignData(Object data, ByteBuffer eventDataBuffer) throws IOException {
if (data instanceof String) {
eventDataBuffer.putInt(((String) data).length());
eventDataBuffer.put((((String) data).getBytes(Charset.defaultCharset())));
} else if (data instanceof Integer) {
eventDataBuffer.putInt((Integer) data);
} else if (data instanceof Long) {
eventDataBuffer.putLong((Long) data);
} else if (data instanceof Float) {
eventDataBuffer.putFloat((Float) data);
} else if (data instanceof Double) {
eventDataBuffer.putDouble((Double) data);
} else if (data instanceof Boolean) {
eventDataBuffer.put((byte) (((Boolean) data) ? 1 : 0));
} else if (data == null){
//put nothing into he Buffer
} else {
eventDataBuffer.putInt(0);
}
}
public static String getString(ByteBuf byteBuf, int size) throws UnsupportedEncodingException {
byte[] bytes = new byte[size];
byteBuf.readBytes(bytes);
return new String(bytes, Charset.defaultCharset());
}
public static String getString(ByteBuffer byteBuf, int size) throws UnsupportedEncodingException {
byte[] bytes = new byte[size];
byteBuf.get(bytes);
return new String(bytes, Charset.defaultCharset());
}
}
SiddhiEventConverter.java 用于在被动节点处理事件数据
static Object[] toObjectArray(ByteBuffer byteBuffer,
String[] attributeTypeOrder) throws UnsupportedEncodingException {
if (attributeTypeOrder != null) {
Object[] objects = new Object[attributeTypeOrder.length];
for (int i = 0; i < attributeTypeOrder.length; i++) {
switch (attributeTypeOrder[i]) {
case "INT":
objects[i] = byteBuffer.getInt();
break;
case "LONG":
objects[i] = byteBuffer.getLong();
break;
case "STRING":
int stringSize = byteBuffer.getInt();
if (stringSize == 0) {
objects[i] = null;
} else {
objects[i] = BinaryMessageConverterUtil.getString(byteBuffer, stringSize);
}
break;
case "DOUBLE":
objects[i] = byteBuffer.getDouble();
break;
case "FLOAT":
objects[i] = byteBuffer.getFloat();
break;
case "BOOL":
objects[i] = byteBuffer.get() == 1;
break;
case "OBJECT":
//for NULL fields
objects[i] = null;
break;
default:
// will not occur
}
}
return objects;
} else {
return null;
}
}