Hazelcast-jet:使用直接查找丰富流时出错
Hazelcast-jet: got error when enriching stream using direct lookup
我正在关注 Doc 以尝试如何通过直接从 IMap 查找来丰富无界流。我有两个地图:
- 产品:
Map<String, Product>
(ProductId 作为键)
- 卖家:
Map<String, Seller>
(SellerId 为键)
Product
和Seller
都很简单类:
public class Product implements DataSerializable {
String productId;
String sellerId;
int price;
...
public class Seller implements DataSerializable {
String sellerId;
int revenue;
...
我有两个数据生成器不断将数据推送到两个地图。两个地图都启用了事件日志。我已验证事件日志工作正常。
我想用Seller
地图丰富Product
地图的流事件。这是我的代码片段:
IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
.withoutTimestamps()
.groupingKey(Product::getSellerId)
.mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
.drainTo(getSink());
try {
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
jobConfig.setName(Constants.BASIC_TASK);
Job job = jetClient.newJob(p, jobConfig);
} finally {
jetClient.shutdown();
}
提交作业时,出现以下错误:
com.hazelcast.spi.impl.operationservice.impl.Invocation - [172.31.33.212]:80 [jet] [3.1] Failed asynchronous execution of execution callback: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService', identityHash=1939050026, partitionId=70, replicaIndex=0, callId=-37944, invocationTime=1570410704479 (2019-10-07 01:11:44.479), waitTimeout=-1, callTimeout=60000, name=sellerMap}, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeoutMillis=60000, firstInvocationTimeMs=1570410704479, firstInvocationTime='2019-10-07 01:11:44.479', lastHeartbeatMillis=0, lastHeartbeatTime='1970-01-01 00:00:00.000', target=[172.31.33.212]:80, pendingResponse={VOID}, backupsAcksExpected=0, backupsAcksReceived=0, connection=null}
我试图将一个和两个实例放入我的集群中,但得到了相同的错误消息。我无法弄清楚根本原因是什么。
您的问题似乎是 ClassNotFoundException
,即使您在作业中添加了适当的 类。您存储在 IMap
中的对象独立于您的 Jet 作业而存在,当事件日志源要求它们时,Jet 的 IMap 代码会尝试反序列化它们但失败了,因为 Jet 没有您的域模型 类在它的类路径上。
要继续,请添加一个包含您在 IMap 中使用的 类 的 JAR 到 Jet 的类路径。我们正在寻找可以消除此要求的解决方案。
您在日志输出中没有得到异常堆栈跟踪的原因是由于默认 java.util.logging
设置,当您没有显式添加更灵活的日志记录模块(例如 Log4j)时,您最终得到的设置.
下一个版本的Jet 打包将在这方面进行改进。在此之前,您可以按照以下步骤操作:
进入Jet的分发包lib
目录,将Log4j下载进去:
$ cd lib
$ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar
编辑 bin/common.sh
以将模块添加到类路径。在文件末尾有一行
CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH"
您可以复制此行并将 hazelcast-jet-3.1
替换为 log4j-1.2.17
。
在commons.sh
的末尾有一个构造JAVA_OPTS
变量的多行命令。将 "-Dhazelcast.logging.type=log4j"
和 "-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties"
添加到列表中。
在config
目录下创建文件log4j.properties
:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n
# Change this level to debug to diagnose failed cluster formation:
log4j.logger.com.hazelcast.internal.cluster=info
log4j.logger.com.hazelcast.jet=info
log4j.rootLogger=info, stdout
我正在关注 Doc 以尝试如何通过直接从 IMap 查找来丰富无界流。我有两个地图:
- 产品:
Map<String, Product>
(ProductId 作为键) - 卖家:
Map<String, Seller>
(SellerId 为键)
Product
和Seller
都很简单类:
public class Product implements DataSerializable {
String productId;
String sellerId;
int price;
...
public class Seller implements DataSerializable {
String sellerId;
int revenue;
...
我有两个数据生成器不断将数据推送到两个地图。两个地图都启用了事件日志。我已验证事件日志工作正常。
我想用Seller
地图丰富Product
地图的流事件。这是我的代码片段:
IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
.withoutTimestamps()
.groupingKey(Product::getSellerId)
.mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
.drainTo(getSink());
try {
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
jobConfig.setName(Constants.BASIC_TASK);
Job job = jetClient.newJob(p, jobConfig);
} finally {
jetClient.shutdown();
}
提交作业时,出现以下错误:
com.hazelcast.spi.impl.operationservice.impl.Invocation - [172.31.33.212]:80 [jet] [3.1] Failed asynchronous execution of execution callback: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService', identityHash=1939050026, partitionId=70, replicaIndex=0, callId=-37944, invocationTime=1570410704479 (2019-10-07 01:11:44.479), waitTimeout=-1, callTimeout=60000, name=sellerMap}, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeoutMillis=60000, firstInvocationTimeMs=1570410704479, firstInvocationTime='2019-10-07 01:11:44.479', lastHeartbeatMillis=0, lastHeartbeatTime='1970-01-01 00:00:00.000', target=[172.31.33.212]:80, pendingResponse={VOID}, backupsAcksExpected=0, backupsAcksReceived=0, connection=null}
我试图将一个和两个实例放入我的集群中,但得到了相同的错误消息。我无法弄清楚根本原因是什么。
您的问题似乎是 ClassNotFoundException
,即使您在作业中添加了适当的 类。您存储在 IMap
中的对象独立于您的 Jet 作业而存在,当事件日志源要求它们时,Jet 的 IMap 代码会尝试反序列化它们但失败了,因为 Jet 没有您的域模型 类在它的类路径上。
要继续,请添加一个包含您在 IMap 中使用的 类 的 JAR 到 Jet 的类路径。我们正在寻找可以消除此要求的解决方案。
您在日志输出中没有得到异常堆栈跟踪的原因是由于默认 java.util.logging
设置,当您没有显式添加更灵活的日志记录模块(例如 Log4j)时,您最终得到的设置.
下一个版本的Jet 打包将在这方面进行改进。在此之前,您可以按照以下步骤操作:
进入Jet的分发包
lib
目录,将Log4j下载进去:$ cd lib $ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar
编辑
bin/common.sh
以将模块添加到类路径。在文件末尾有一行CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH"
您可以复制此行并将
hazelcast-jet-3.1
替换为log4j-1.2.17
。在
commons.sh
的末尾有一个构造JAVA_OPTS
变量的多行命令。将"-Dhazelcast.logging.type=log4j"
和"-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties"
添加到列表中。在
config
目录下创建文件log4j.properties
:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n
# Change this level to debug to diagnose failed cluster formation:
log4j.logger.com.hazelcast.internal.cluster=info
log4j.logger.com.hazelcast.jet=info
log4j.rootLogger=info, stdout