Hazelcast-jet:使用直接查找丰富流时出错

Hazelcast-jet: got error when enriching stream using direct lookup

我正在关注 Doc 以尝试如何通过直接从 IMap 查找来丰富无界流。我有两个地图:

  1. 产品:Map<String, Product>(ProductId 作为键)
  2. 卖家:Map<String, Seller>(SellerId 为键)

ProductSeller都很简单类:

public class Product implements DataSerializable {
    String productId;
    String sellerId;
    int price;
...
public class Seller implements DataSerializable {
    String sellerId;
    int revenue;
...

我有两个数据生成器不断将数据推送到两个地图。两个地图都启用了事件日志。我已验证事件日志工作正常。

我想用Seller地图丰富Product地图的流事件。这是我的代码片段:

IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
            .withoutTimestamps()
            .groupingKey(Product::getSellerId)
            .mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
            .drainTo(getSink());
try {
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
        jobConfig.setName(Constants.BASIC_TASK);
        Job job = jetClient.newJob(p, jobConfig);
    } finally {
        jetClient.shutdown();
    }

提交作业时,出现以下错误:

com.hazelcast.spi.impl.operationservice.impl.Invocation - [172.31.33.212]:80 [jet] [3.1] Failed asynchronous execution of execution callback: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService', identityHash=1939050026, partitionId=70, replicaIndex=0, callId=-37944, invocationTime=1570410704479 (2019-10-07 01:11:44.479), waitTimeout=-1, callTimeout=60000, name=sellerMap}, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeoutMillis=60000, firstInvocationTimeMs=1570410704479, firstInvocationTime='2019-10-07 01:11:44.479', lastHeartbeatMillis=0, lastHeartbeatTime='1970-01-01 00:00:00.000', target=[172.31.33.212]:80, pendingResponse={VOID}, backupsAcksExpected=0, backupsAcksReceived=0, connection=null}

我试图将一个和两个实例放入我的集群中,但得到了相同的错误消息。我无法弄清楚根本原因是什么。

您的问题似乎是 ClassNotFoundException,即使您在作业中添加了适当的 类。您存储在 IMap 中的对象独立于您的 Jet 作业而存在,当事件日志源要求它们时,Jet 的 IMap 代码会尝试反序列化它们但失败了,因为 Jet 没有您的域模型 类在它的类路径上。

要继续,请添加一个包含您在 IMap 中使用的 类 的 JAR 到 Jet 的类路径。我们正在寻找可以消除此要求的解决方案。

您在日志输出中没有得到异常堆栈跟踪的原因是由于默认 java.util.logging 设置,当您没有显式添加更灵活的日志记录模块(例如 Log4j)时,您最终得到的设置.

下一个版本的Jet 打包将在这方面进行改进。在此之前,您可以按照以下步骤操作:

  1. 进入Jet的分发包lib目录,将Log4j下载进去:

    $ cd lib
    $ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar
    
  2. 编辑 bin/common.sh 以将模块添加到类路径。在文件末尾有一行

    CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH"
    

    您可以复制此行并将 hazelcast-jet-3.1 替换为 log4j-1.2.17

  3. commons.sh的末尾有一个构造JAVA_OPTS变量的多行命令。将 "-Dhazelcast.logging.type=log4j""-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties" 添加到列表中。

  4. config目录下创建文件log4j.properties

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n

# Change this level to debug to diagnose failed cluster formation:
log4j.logger.com.hazelcast.internal.cluster=info

log4j.logger.com.hazelcast.jet=info
log4j.rootLogger=info, stdout