Flink Streaming RichSource 提前停止
Flink Streaming RichSource early stops
它 运行s 处理时间和使用广播状态。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
BroadcastStream<List<TableOperations>> broadcastOperationsState = env
.addSource(new LoadCassandraOperations(10000L, cassandraHost, cassandraPort)).broadcast(descriptor);
SingleOutputStreamOperator<InternalVariableValue> stream =
env.addSource(new SourceMillisInternalVariableValue(5000L));
SingleOutputStreamOperator<InternalVariableOperation> streamProcessed =
stream
.keyBy(InternalVariableValue::getUuid)
.connect(broadcastOperationsState)
.process(new AddOperationInfo())
;
streamProcessed.print();
SourceMillisIntervalVariableValues 每 5 秒创建一个事件。事件存储在静态集合中。 运行 方法看起来像:
public class SourceMillisInternalVariableValue extends RichSourceFunction<InternalVariableValue>{
private boolean running;
long millis;
public SourceMillisInternalVariableValue(long millis) {
super();
this.millis = millis;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
}
@Override
public void cancel() {
running = false;
}
@Override
public void run(SourceContext<InternalVariableValue> ctx) throws Exception {
//Espera inicial
Thread.sleep(1500);
PojoVariableValues[] pojoData =
new PojoVariableValues[]{
new PojoVariableValues("id1", "1"),
new PojoVariableValues("id2", "2"),
....
....
new PojoVariableValues("id21", "21")
};
int cont = 0;
while (cont<pojoData.length) {
System.out.println("Iteration "+cont+" "+pojoData.length);
ctx.collect(generateVar(pojoData[0+cont].getUUID(), pojoData[0+cont].getValue()));
ctx.collect(generateVar(pojoData[1+cont].getUUID(), pojoData[1+cont].getValue()));
ctx.collect(generateVar(pojoData[2+cont].getUUID(), pojoData[2+cont].getValue()));
cont = cont +3;
Thread.sleep(millis);
}
}
private InternalVariableValue generateVar(String uuid, String value)
{
return InternalVariableValueMessage.InternalVariableValue.newBuilder()
.setUuid(uuid)
.setTimestamp(new Date().getTime()).setValue(value).setKeyspace("nest").build();
}
class PojoVariableValues {
private String UUID;
private String Value;
public PojoVariableValues(String uUID, String value) {
super();
UUID = uUID;
Value = value;
}
public String getUUID() {
return UUID;
}
public void setUUID(String uUID) {
UUID = uUID;
}
public String getValue() {
return Value;
}
public void setValue(String value) {
Value = value;
}
}
}
LoadCassandraOperations 每 10 秒发出一次事件。它工作正常。
当我 运行 此代码时,SourceMillisIntervalVariableValues 在第一次迭代中停止,仅发出三个事件。如果我评论 process 函数,两个来源都正常工作,但如果我 运行 process ,来源被取消...
我认为源发出了所有事件(恰好 21 个),并且所有事件都在聚合函数中处理。如果我 运行 这段代码,源代码中的 while 循环只完成一次迭代。
有什么想法吗?
谢谢你。干杯
编辑:
重要。此代码用于探索处理时间和广播功能。我知道我没有使用资源中的最佳实践。谢谢
编辑 2:
当我尝试 运行 处理函数时,问题就开始了。
已解决!!
问题是我尝试使用 TestContainer 运行 它,但我看不到任何日志。
我运行它有一个简单的主要方法,我可以看到一些代码错误(就像评论中的评论一样。Tnks !!!)。
它 运行s 处理时间和使用广播状态。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
BroadcastStream<List<TableOperations>> broadcastOperationsState = env
.addSource(new LoadCassandraOperations(10000L, cassandraHost, cassandraPort)).broadcast(descriptor);
SingleOutputStreamOperator<InternalVariableValue> stream =
env.addSource(new SourceMillisInternalVariableValue(5000L));
SingleOutputStreamOperator<InternalVariableOperation> streamProcessed =
stream
.keyBy(InternalVariableValue::getUuid)
.connect(broadcastOperationsState)
.process(new AddOperationInfo())
;
streamProcessed.print();
SourceMillisIntervalVariableValues 每 5 秒创建一个事件。事件存储在静态集合中。 运行 方法看起来像:
public class SourceMillisInternalVariableValue extends RichSourceFunction<InternalVariableValue>{
private boolean running;
long millis;
public SourceMillisInternalVariableValue(long millis) {
super();
this.millis = millis;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
}
@Override
public void cancel() {
running = false;
}
@Override
public void run(SourceContext<InternalVariableValue> ctx) throws Exception {
//Espera inicial
Thread.sleep(1500);
PojoVariableValues[] pojoData =
new PojoVariableValues[]{
new PojoVariableValues("id1", "1"),
new PojoVariableValues("id2", "2"),
....
....
new PojoVariableValues("id21", "21")
};
int cont = 0;
while (cont<pojoData.length) {
System.out.println("Iteration "+cont+" "+pojoData.length);
ctx.collect(generateVar(pojoData[0+cont].getUUID(), pojoData[0+cont].getValue()));
ctx.collect(generateVar(pojoData[1+cont].getUUID(), pojoData[1+cont].getValue()));
ctx.collect(generateVar(pojoData[2+cont].getUUID(), pojoData[2+cont].getValue()));
cont = cont +3;
Thread.sleep(millis);
}
}
private InternalVariableValue generateVar(String uuid, String value)
{
return InternalVariableValueMessage.InternalVariableValue.newBuilder()
.setUuid(uuid)
.setTimestamp(new Date().getTime()).setValue(value).setKeyspace("nest").build();
}
class PojoVariableValues {
private String UUID;
private String Value;
public PojoVariableValues(String uUID, String value) {
super();
UUID = uUID;
Value = value;
}
public String getUUID() {
return UUID;
}
public void setUUID(String uUID) {
UUID = uUID;
}
public String getValue() {
return Value;
}
public void setValue(String value) {
Value = value;
}
}
}
LoadCassandraOperations 每 10 秒发出一次事件。它工作正常。
当我 运行 此代码时,SourceMillisIntervalVariableValues 在第一次迭代中停止,仅发出三个事件。如果我评论 process 函数,两个来源都正常工作,但如果我 运行 process ,来源被取消...
我认为源发出了所有事件(恰好 21 个),并且所有事件都在聚合函数中处理。如果我 运行 这段代码,源代码中的 while 循环只完成一次迭代。
有什么想法吗?
谢谢你。干杯
编辑:
重要。此代码用于探索处理时间和广播功能。我知道我没有使用资源中的最佳实践。谢谢
编辑 2: 当我尝试 运行 处理函数时,问题就开始了。
已解决!!
问题是我尝试使用 TestContainer 运行 它,但我看不到任何日志。
我运行它有一个简单的主要方法,我可以看到一些代码错误(就像评论中的评论一样。Tnks !!!)。