为什么使用kryo序列化框架进入apache storm blot获取值时会overwrite data
Why use Kryo serialize framework into apache storm will over write data when blot get values
可能大多数开发人员都使用 AVRO 作为 Kafka 和 Apache Storm 方案中的序列化框架。但是我需要处理最复杂的数据然后我发现 Kryo 序列化框架也成功地将它集成到我们遵循 Kafka 和 Apache Storm 环境的项目中。但是想进一步操作的时候出现了一个奇怪的状态。
我已经向Kafka发送了5次消息,Storm作业也可以读取这5次消息并反序列化成功。但是接下来blot得到的数据值是错误的。打印出与上一条消息相同的值。然后我在完成反序列化代码后添加了打印输出。实际上它打印出 true 有不同的 5 条消息。为什么下一个印迹不能值?请参阅下面的代码:
KryoScheme.java
public abstract class KryoScheme<T> implements Scheme {
private static final long serialVersionUID = 6923985190833960706L;
private static final Logger logger = LoggerFactory.getLogger(KryoScheme.class);
private Class<T> clazz;
private Serializer<T> serializer;
public KryoScheme(Class<T> clazz, Serializer<T> serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
public List<Object> deserialize(byte[] buffer) {
Kryo kryo = new Kryo();
kryo.register(clazz, serializer);
T scheme = null;
try {
scheme = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), this.clazz);
logger.info("{}", scheme);
} catch (Exception e) {
String errMsg = String.format("Kryo Scheme failed to deserialize data from Kafka to %s. Raw: %s",
clazz.getName(),
new String(buffer));
logger.error(errMsg, e);
throw new FailedException(errMsg, e);
}
return new Values(scheme);
}}
PrintFunction.java
public class PrintFunction extends BaseFunction {
private static final Logger logger = LoggerFactory.getLogger(PrintFunction.class);
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
List<Object> data = tuple.getValues();
if (data != null) {
logger.info("Scheme data size: {}", data.size());
for (Object value : data) {
PrintOut out = (PrintOut) value;
logger.info("{}.{}--value: {}",
Thread.currentThread().getName(),
Thread.currentThread().getId(),
out.toString());
collector.emit(new Values(out));
}
}
}}
StormLocalTopology.java
public class StormLocalTopology {
public static void main(String[] args) {
........
BrokerHosts zk = new ZkHosts("xxxxxx");
Config stormConf = new Config();
stormConf.put(Config.TOPOLOGY_DEBUG, false);
stormConf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1000 * 5);
stormConf.put(Config.TOPOLOGY_WORKERS, 1);
stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 5);
stormConf.put(Config.TOPOLOGY_TASKS, 1);
TridentKafkaConfig actSpoutConf = new TridentKafkaConfig(zk, topic);
actSpoutConf.fetchSizeBytes = 5 * 1024 * 1024 ;
actSpoutConf.bufferSizeBytes = 5 * 1024 * 1024 ;
actSpoutConf.scheme = new SchemeAsMultiScheme(scheme);
actSpoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
TridentTopology topology = new TridentTopology();
TransactionalTridentKafkaSpout actSpout = new TransactionalTridentKafkaSpout(actSpoutConf);
topology.newStream(topic, actSpout).parallelismHint(4).shuffle()
.each(new Fields("act"), new PrintFunction(), new Fields());
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topic+"Topology", stormConf, topology.build());
}}
还有其他问题,为什么kryo方案只能读取一个消息缓冲区。有没有其他方法获取多消息缓冲区然后可以批量发送数据到下一个印迹。
Also if I send 1 message the full flow seems success.
Then send 2 message is wrong. the print out message like below:
56157 [Thread-18-spout0] INFO s.s.a.s.s.c.KryoScheme - 2016-02- 05T17:20:48.122+0800,T6mdfEW@N5pEtNBW
56160 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Scheme data size: 1
56160 [Thread-18-spout0] INFO s.s.a.s.s.c.KryoScheme - 2016-02- 05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56161 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56162 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Scheme data size: 1
56162 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
对不起,这是我的错误。刚刚在 Kryo 反序列化 class 中发现了一个错误,存在一个局部作用域参数,因此它可以在多线程环境中被覆盖。不改变party范围内的参数,代码运行很好。
参考代码见下文:
public class KryoSerializer<T extends BasicEvent> extends Serializer<T> implements Serializable {
private static final long serialVersionUID = -4684340809824908270L;
// It's wrong set
//private T event;
public KryoSerializer(T event) {
this.event = event;
}
@Override
public void write(Kryo kryo, Output output, T event) {
event.write(output);
}
@Override
public T read(Kryo kryo, Input input, Class<T> type) {
T event = new T();
event.read(input);
return event;
}
}
可能大多数开发人员都使用 AVRO 作为 Kafka 和 Apache Storm 方案中的序列化框架。但是我需要处理最复杂的数据然后我发现 Kryo 序列化框架也成功地将它集成到我们遵循 Kafka 和 Apache Storm 环境的项目中。但是想进一步操作的时候出现了一个奇怪的状态。
我已经向Kafka发送了5次消息,Storm作业也可以读取这5次消息并反序列化成功。但是接下来blot得到的数据值是错误的。打印出与上一条消息相同的值。然后我在完成反序列化代码后添加了打印输出。实际上它打印出 true 有不同的 5 条消息。为什么下一个印迹不能值?请参阅下面的代码:
KryoScheme.java
public abstract class KryoScheme<T> implements Scheme {
private static final long serialVersionUID = 6923985190833960706L;
private static final Logger logger = LoggerFactory.getLogger(KryoScheme.class);
private Class<T> clazz;
private Serializer<T> serializer;
public KryoScheme(Class<T> clazz, Serializer<T> serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
public List<Object> deserialize(byte[] buffer) {
Kryo kryo = new Kryo();
kryo.register(clazz, serializer);
T scheme = null;
try {
scheme = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), this.clazz);
logger.info("{}", scheme);
} catch (Exception e) {
String errMsg = String.format("Kryo Scheme failed to deserialize data from Kafka to %s. Raw: %s",
clazz.getName(),
new String(buffer));
logger.error(errMsg, e);
throw new FailedException(errMsg, e);
}
return new Values(scheme);
}}
PrintFunction.java
public class PrintFunction extends BaseFunction {
private static final Logger logger = LoggerFactory.getLogger(PrintFunction.class);
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
List<Object> data = tuple.getValues();
if (data != null) {
logger.info("Scheme data size: {}", data.size());
for (Object value : data) {
PrintOut out = (PrintOut) value;
logger.info("{}.{}--value: {}",
Thread.currentThread().getName(),
Thread.currentThread().getId(),
out.toString());
collector.emit(new Values(out));
}
}
}}
StormLocalTopology.java
public class StormLocalTopology {
public static void main(String[] args) {
........
BrokerHosts zk = new ZkHosts("xxxxxx");
Config stormConf = new Config();
stormConf.put(Config.TOPOLOGY_DEBUG, false);
stormConf.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 1000 * 5);
stormConf.put(Config.TOPOLOGY_WORKERS, 1);
stormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 5);
stormConf.put(Config.TOPOLOGY_TASKS, 1);
TridentKafkaConfig actSpoutConf = new TridentKafkaConfig(zk, topic);
actSpoutConf.fetchSizeBytes = 5 * 1024 * 1024 ;
actSpoutConf.bufferSizeBytes = 5 * 1024 * 1024 ;
actSpoutConf.scheme = new SchemeAsMultiScheme(scheme);
actSpoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
TridentTopology topology = new TridentTopology();
TransactionalTridentKafkaSpout actSpout = new TransactionalTridentKafkaSpout(actSpoutConf);
topology.newStream(topic, actSpout).parallelismHint(4).shuffle()
.each(new Fields("act"), new PrintFunction(), new Fields());
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topic+"Topology", stormConf, topology.build());
}}
还有其他问题,为什么kryo方案只能读取一个消息缓冲区。有没有其他方法获取多消息缓冲区然后可以批量发送数据到下一个印迹。
Also if I send 1 message the full flow seems success.
Then send 2 message is wrong. the print out message like below:
56157 [Thread-18-spout0] INFO s.s.a.s.s.c.KryoScheme - 2016-02- 05T17:20:48.122+0800,T6mdfEW@N5pEtNBW
56160 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Scheme data size: 1
56160 [Thread-18-spout0] INFO s.s.a.s.s.c.KryoScheme - 2016-02- 05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56161 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
56162 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Scheme data size: 1
56162 [Thread-20-b-0] INFO s.s.a.s.s.PrintFunction - Thread-20-b-0.99--value: 2016-02-05T17:20:48.282+0800,T(o2KnFxtGB0Tlp8
对不起,这是我的错误。刚刚在 Kryo 反序列化 class 中发现了一个错误,存在一个局部作用域参数,因此它可以在多线程环境中被覆盖。不改变party范围内的参数,代码运行很好。
参考代码见下文:
public class KryoSerializer<T extends BasicEvent> extends Serializer<T> implements Serializable {
private static final long serialVersionUID = -4684340809824908270L;
// It's wrong set
//private T event;
public KryoSerializer(T event) {
this.event = event;
}
@Override
public void write(Kryo kryo, Output output, T event) {
event.write(output);
}
@Override
public T read(Kryo kryo, Input input, Class<T> type) {
T event = new T();
event.read(input);
return event;
}
}