apache ignite datastreamer 如何将数据设置到 ignitefuture 中?
apache ignite datastreamer how to set data into ignitefuture?
我正在 apache ignite 中创建批处理数据流,需要控制接收数据后发生的事情。
我的批次有一个结构:
public class Batch implements Binarylizable, Serializable {
private String eventKey;
private byte[] bytes;
etc..
然后我尝试流式传输我的数据:
try (IgniteDataStreamer<Integer, Batch> streamer = serviceGrid.getIgnite().dataStreamer(cacheName);
StreamBatcher batcher = StreamBatcherFactory.create(event) ){
streamer.receiver(StreamTransformer.from(new BatchDataProcessor(event)));
streamer.autoFlushFrequency(1000);
streamer.allowOverwrite(true);
statusService.updateStatus(event.getKey(), StatusType.EXECUTING);
int counter = 0;
Batch batch = null;
IgniteFuture<?> future = null;
while ((batch = batcher.batch()) != null) {
future = streamer.addData(counter++, batch);
}
Object getted = future.get();
仅供测试使用,让我们只获取最后一个未来,并尝试分析此对象。在上面的代码中,我使用的是 BatchDataProcessor,它看起来像这样:
public class BatchDataProcessor implements CacheEntryProcessor<Integer, Batch, Object> {
private final Event event;
private final String eventKey;
public BatchDataProcessor(Event event) {
this.event = event;
this.eventKey = event.getKey();
}
@Override
public Object process(MutableEntry<Integer, Batch> mutableEntry, Object... objects) throws EntryProcessorException {
Node node = NodeIgniter.node(Ignition.localIgnite().cluster().localNode().id());
ServiceGridContainer container = (ServiceGridContainer) node.getEnvironmentContainer().getContainerObject(ServiceGridContainer.class);
ProcessMarshaller marshaller = (ProcessMarshaller) container.getService(ProcessMarshaller.class);
LocalProcess localProcess = marshaller.intoProccessing(event.getLambdaExecutionKey());
try {
localProcess.addBatch(mutableEntry);
} catch (IOException e) {
e.printStackTrace();
} finally {
return new String("111");
}
}
}
所以在 localProcess.addBatch(mutableEntry) 之后我想发回关于这个特定批次的状态的信息,所以我认为我应该在 IgniteFuture 对象中这样做,但我没有找到任何信息如何控制在 addData 函数中接收到的未来对象。
任何人都可以帮助理解,我在哪里可以控制在 addData 函数中接收的未来或以其他方式实现对流批处理的回调?
当您执行 StreamTransformer.from()
时,您将放弃 BatchDataProcessor
的结果,因为
for (Map.Entry<K, V> entry : entries)
cache.invoke(entry.getKey(), this, entry.getValue());
// ^ result of cache.invoke() is discarded here
DataStreamer
用于 one-directional 数据流。据我所知,它不应该 return 值。
如果你依赖cache.invoke()
的结果,我建议直接调用它而不是依赖DataStreamer
。
顺便说一句,小心fut.get()
。你应该先做 dataStreamer.flush()
,否则 DataStreamer
的期货将无限期等待。
我正在 apache ignite 中创建批处理数据流,需要控制接收数据后发生的事情。 我的批次有一个结构:
public class Batch implements Binarylizable, Serializable {
private String eventKey;
private byte[] bytes;
etc..
然后我尝试流式传输我的数据:
try (IgniteDataStreamer<Integer, Batch> streamer = serviceGrid.getIgnite().dataStreamer(cacheName);
StreamBatcher batcher = StreamBatcherFactory.create(event) ){
streamer.receiver(StreamTransformer.from(new BatchDataProcessor(event)));
streamer.autoFlushFrequency(1000);
streamer.allowOverwrite(true);
statusService.updateStatus(event.getKey(), StatusType.EXECUTING);
int counter = 0;
Batch batch = null;
IgniteFuture<?> future = null;
while ((batch = batcher.batch()) != null) {
future = streamer.addData(counter++, batch);
}
Object getted = future.get();
仅供测试使用,让我们只获取最后一个未来,并尝试分析此对象。在上面的代码中,我使用的是 BatchDataProcessor,它看起来像这样:
public class BatchDataProcessor implements CacheEntryProcessor<Integer, Batch, Object> {
private final Event event;
private final String eventKey;
public BatchDataProcessor(Event event) {
this.event = event;
this.eventKey = event.getKey();
}
@Override
public Object process(MutableEntry<Integer, Batch> mutableEntry, Object... objects) throws EntryProcessorException {
Node node = NodeIgniter.node(Ignition.localIgnite().cluster().localNode().id());
ServiceGridContainer container = (ServiceGridContainer) node.getEnvironmentContainer().getContainerObject(ServiceGridContainer.class);
ProcessMarshaller marshaller = (ProcessMarshaller) container.getService(ProcessMarshaller.class);
LocalProcess localProcess = marshaller.intoProccessing(event.getLambdaExecutionKey());
try {
localProcess.addBatch(mutableEntry);
} catch (IOException e) {
e.printStackTrace();
} finally {
return new String("111");
}
}
}
所以在 localProcess.addBatch(mutableEntry) 之后我想发回关于这个特定批次的状态的信息,所以我认为我应该在 IgniteFuture 对象中这样做,但我没有找到任何信息如何控制在 addData 函数中接收到的未来对象。
任何人都可以帮助理解,我在哪里可以控制在 addData 函数中接收的未来或以其他方式实现对流批处理的回调?
当您执行 StreamTransformer.from()
时,您将放弃 BatchDataProcessor
的结果,因为
for (Map.Entry<K, V> entry : entries)
cache.invoke(entry.getKey(), this, entry.getValue());
// ^ result of cache.invoke() is discarded here
DataStreamer
用于 one-directional 数据流。据我所知,它不应该 return 值。
如果你依赖cache.invoke()
的结果,我建议直接调用它而不是依赖DataStreamer
。
顺便说一句,小心fut.get()
。你应该先做 dataStreamer.flush()
,否则 DataStreamer
的期货将无限期等待。