风暴螺栓比其父级执行更多
Storm bolt executes more than its parent
我有一个包含 KafkaSpout 和 2 个螺栓的拓扑。
BoltParseJsonInput 及其执行方法:
public void execute(Tuple input) {
// TODO Auto-generated method stub
String data = input.getString(4);
js = new JSONObject(data);
String userId = js.getString("userId");
String timestamp = js.getString("timestamp");
counter++;
System.out.println(counter);
collector.emit(input, new Values(userId, timestamp));
collector.ack(input);
}
BoltInsertRedis 及其执行方法
public void execute(Tuple input) {
// TODO Auto-generated method stub
String userId = input.getStringByField("userId");
int timestamp = 0;
try {
timestamp = convertTimestampToEpoch(input.getStringByField("timestamp"));
} catch (ParseException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
String timestep = this.prefix + timestamp/10;
String curTimestamp = jedis.hget(timestep, userId);
if(curTimestamp == null || Integer.parseInt(curTimestamp) < timestamp) {
jedis.hset(timestep, userId, Integer.toString(timestamp));
}
collector.ack(input);
}
BoltInsertRedis 从 BoltParseJsonInput 获取输入
builder.setBolt("ParseJsonInput-Bolt", new BoltParseJsonInput()).shuffleGrouping("Kafka-Spout");
builder.setBolt("BoltRedisUserLastActive-Bolt", new BoltRedisUserLastActive()).shuffleGrouping("ParseJsonInput-Bolt");
但是当我将这个拓扑提交到Storm时,BoltInsertRedis执行的比BoltParseJsonInput多
你能给我解释一下这里的问题是什么吗?
我发现我的 ParseJsonBolt 在消息 25700 处发生了异常,并且它一直在重放执行。当我尝试捕获时,它运行良好
我有一个包含 KafkaSpout 和 2 个螺栓的拓扑。
BoltParseJsonInput 及其执行方法:
public void execute(Tuple input) {
// TODO Auto-generated method stub
String data = input.getString(4);
js = new JSONObject(data);
String userId = js.getString("userId");
String timestamp = js.getString("timestamp");
counter++;
System.out.println(counter);
collector.emit(input, new Values(userId, timestamp));
collector.ack(input);
}
BoltInsertRedis 及其执行方法
public void execute(Tuple input) {
// TODO Auto-generated method stub
String userId = input.getStringByField("userId");
int timestamp = 0;
try {
timestamp = convertTimestampToEpoch(input.getStringByField("timestamp"));
} catch (ParseException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
String timestep = this.prefix + timestamp/10;
String curTimestamp = jedis.hget(timestep, userId);
if(curTimestamp == null || Integer.parseInt(curTimestamp) < timestamp) {
jedis.hset(timestep, userId, Integer.toString(timestamp));
}
collector.ack(input);
}
BoltInsertRedis 从 BoltParseJsonInput 获取输入
builder.setBolt("ParseJsonInput-Bolt", new BoltParseJsonInput()).shuffleGrouping("Kafka-Spout");
builder.setBolt("BoltRedisUserLastActive-Bolt", new BoltRedisUserLastActive()).shuffleGrouping("ParseJsonInput-Bolt");
但是当我将这个拓扑提交到Storm时,BoltInsertRedis执行的比BoltParseJsonInput多
你能给我解释一下这里的问题是什么吗?
我发现我的 ParseJsonBolt 在消息 25700 处发生了异常,并且它一直在重放执行。当我尝试捕获时,它运行良好