Infinispan 9 - 尝试使用 Hotrod 客户端和 Protobuf 将对象放入缓存时出现服务器异常

Infinispan 9 - Server exception when trying to put an object in cache using Hotrod client and Protobuf

我在将 Infinispan 9.4.0/9.4.1 与 Hot Rod Client 和 Protobuf 协议结合使用时遇到问题,无法使用查询。

这是我的缓存配置:

<cache-container name="clustered" default-cache="RequestIndexed" statistics="false">
    <transport channel="cluster" lock-timeout="1000"/>
    <global-state/>
    <modules>
        <module name="deployment.infinispan-module.jar"/>
    </modules>
    <replicated-cache name="RequestIndexedIndexLockingCache" remote-timeout="3000" statistics-available="false">
        <indexing index="NONE"/>
    </replicated-cache>
    <replicated-cache name="RequestIndexedIndexDataCache" remote-timeout="3000" statistics-available="false">
        <indexing index="NONE"/>
    </replicated-cache>
    <replicated-cache name="RequestIndexedIndexMetadataCache" remote-timeout="3000" statistics-available="false">
        <indexing index="NONE"/>
    </replicated-cache>
    <distributed-cache name="RequestIndexed" remote-timeout="3000" statistics-available="false">
        <memory>
            <object size="20" strategy="LRU"/>
        </memory>
        <compatibility enabled="true"/>
        <file-store path="system-store" passivation="true"/>
        <indexing index="PRIMARY_OWNER">
            <property name="default.indexmanager">
                org.infinispan.query.indexmanager.InfinispanIndexManager
            </property>
            <property name="default.locking_cachename">
                RequestIndexedIndexLockingCache
            </property>
            <property name="default.data_cachename">
                RequestIndexedIndexDataCache
            </property>
            <property name="default.metadata_cachename">
                RequestIndexedIndexMetadataCache
            </property>
        </indexing>
        <state-transfer timeout="60000" chunk-size="1024"/>
    </distributed-cache>
</cache-container>

我的实体

public class EntityDemo implements Serializable {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    private long id;

    private String name;

    private String value;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public String getKey() {
        return String.valueOf(id);
    }
}

原型文件

package test.infinispan.entity;

option indexed_by_default = false;

message EntityDemo {
    /*@Field*/
    required int64 id = 1;
    /*@Field*/
    required string name = 2;
    optional string value = 3;
}

编组器

public class EntityDemoMarshaller implements Serializable, MessageMarshaller<EntityDemo> {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    @Override
    public Class<? extends EntityDemo> getJavaClass() {
        return EntityDemo.class;
    }

    @Override
    public String getTypeName() {
        return "test.infinispan.entity.EntityDemo";
    }

    @Override
    public EntityDemo readFrom(ProtoStreamReader reader) throws IOException {
        final EntityDemo ed = new EntityDemo();
        ed.setId(reader.readLong("id"));
        ed.setName(reader.readString("name"));
        ed.setValue(reader.readString("value"));
        return null;
    }

    @Override
    public void writeTo(ProtoStreamWriter writer, EntityDemo ed) throws IOException {
        writer.writeLong("id", ed.getId());
        writer.writeString("name", ed.getName());
        writer.writeString("value", ed.getValue());
    }
}

缓存配置

<distributed-cache name="core.request" remote-timeout="3000" statistics-available="false">
    <memory>
        <object size="20" strategy="LRU"/>
    </memory>
    <file-store path="cache-store" passivation="true"/>
    <indexing index="NONE"/>
    <state-transfer timeout="60000" chunk-size="1024"/>
    <encoding>
        <key media-type="application/x-jboss-marshalling"/>
        <value media-type="application/x-jboss-marshalling"/>
    </encoding>
</distributed-cache>

我与 Infinispan 的联系

ConfigurationBuilder cbi = new ConfigurationBuilder();
cbi.addServers("localhost:11222");
cbi.marshaller(new ProtoStreamMarshaller());
RemoteCacheManager rcmi = new RemoteCacheManager(cbi.build());

SerializationContext sc = ProtoStreamMarshaller.getSerializationContext(rcmi);
FileDescriptorSource fds = new FileDescriptorSource();
fds.addProtoFiles("/proto/EntityDemo.proto");
sc.registerProtoFiles(fds);
sc.registerMarshaller(new EntityDemoMarshaller());

RemoteCache<String, String> metadataCache = rcmi.getCache(ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME);
metadataCache.put("/proto/EntityDemo.proto", readProtoFile("/proto/EntityDemo.proto"));

RemoteCache<String, EntityDemo> rci = rcmi.getCache("RequestIndexed");
rci.clear();
EntityDemo ei = new EntityDemo();
ei.setId(1);
ei.setName("DemoIndexed");
ei.setValue("DemoIndexed");
rci.put(ei.getKey(), ei);

QueryFactory qf = Search.getQueryFactory(rci);
Query q = qf.from(EntityDemo.class)
        .having("id").gte(1)
        .build();
q.list().stream().forEach(v -> System.out.println(v));

我将我的实体放在一个 jar 文件中 (infinispan-module.jar),然后我将该 jar 部署在 Infinispan 中。

当我尝试将对象放入缓存时,服务器出现以下异常

[Server:instance-one] 13:48:17,477 ERROR [stderr] (HotRod-ServerHandler-4-10) Exception in thread "HotRod-ServerHandler-4-10" java.lang.IllegalArgumentException: No marshaller registered for test.infinispan.entity.EntityDemo
[Server:instance-one] 13:48:17,478 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.impl.SerializationContextImpl.getMarshallerDelegate(SerializationContextImpl.java:276)
[Server:instance-one] 13:48:17,479 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.WrappedMessage.readMessage(WrappedMessage.java:379)
[Server:instance-one] 13:48:17,479 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:165)
[Server:instance-one] 13:48:17,480 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.protostream.ProtobufUtil.fromWrappedByteArray(ProtobufUtil.java:160)
[Server:instance-one] 13:48:17,480 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.query.remote.impl.dataconversion.ProtostreamObjectTranscoder.transcode(ProtostreamObjectTranscoder.java:42)
[Server:instance-one] 13:48:17,481 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.encoding.DataConversion.toStorage(DataConversion.java:214)
[Server:instance-one] 13:48:17,481 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.cache.impl.EncoderCache.valueToStorage(EncoderCache.java:111)
[Server:instance-one] 13:48:17,482 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.cache.impl.EncoderCache.putAsync(EncoderCache.java:460)
[Server:instance-one] 13:48:17,482 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.cache.impl.AbstractDelegatingAdvancedCache.putAsync(AbstractDelegatingAdvancedCache.java:386)
[Server:instance-one] 13:48:17,482 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.server.hotrod.CacheRequestProcessor.putInternal(CacheRequestProcessor.java:194)
[Server:instance-one] 13:48:17,483 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.server.hotrod.CacheRequestProcessor.lambda$put(CacheRequestProcessor.java:187)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at org.infinispan.server.hotrod.CacheRequestProcessor$$Lambda5/1582212530.run(Unknown Source)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[Server:instance-one] 13:48:17,484 ERROR [stderr] (HotRod-ServerHandler-4-10)   at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[Server:instance-one] 13:48:17,485 ERROR [stderr] (HotRod-ServerHandler-4-10)   at java.lang.Thread.run(Thread.java:745)

谁能帮我解决这个问题?我一直在寻找解决方案,但找不到任何东西。

更新 2018-11-07

我有一个新问题。

我需要使用带有过滤器和转换器的侦听器。当我只使用过滤器时,一切正常,但是当我添加转换器时,我在服务器端出现异常。我只想接收在 CREATE 自定义事件中放入缓存的对象。

过滤器class

public class RequestEventFilter implements Serializable, CacheEventFilter<String, EntityDemo> {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    private final long filter;

    public RequestEventFilter(Object[] params) {
        this.filter = (Long) params[0];
    }

    @Override
    public boolean accept(String key, EntityDemo oldValue, Metadata oldMetadata, EntityDemo newValue, Metadata newMetadata, EventType eventType) {
        if (eventType.isCreate()) {
            if (newValue.getId() % filter == 0)
                return true;
        }
        return false;
    }
}

过滤器工厂class

@NamedFactory(name="request-event-filter-factory")
public class RequestEventFilterFactory implements CacheEventFilterFactory {

    public RequestEventFilterFactory() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public CacheEventFilter<String, EntityDemo> getFilter(Object[] params) {
        return new RequestEventFilter(params);
    }
}

转换器class

public class RequestEventConverter implements Serializable, CacheEventConverter<String, EntityDemo, EntityDemo> {
    /** Class serial version UID. */
    private static final long serialVersionUID = 1L;

    public RequestEventConverter() {
    }

    @Override
    public EntityDemo convert(String key, EntityDemo oldValue, Metadata oldMetadata, EntityDemo newValue, Metadata newMetadata, EventType eventType) {
        if (newValue != null)
            return newValue;
        else
            return oldValue;
    }

}

转换器工厂

@NamedFactory(name="request-event-converter-factory")
public class RequestEventConverterFactory implements CacheEventConverterFactory {

    public RequestEventConverterFactory() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public CacheEventConverter<String, EntityDemo, EntityDemo> getConverter(Object[] params) {
        return new RequestEventConverter();
    }
}

我的听众

@ClientListener(filterFactoryName="request-event-filter-factory", converterFactoryName="request-event-converter-factory", includeCurrentState = true, useRawData=false)
private class Listener {

    public Listener() {
    }

    @ClientCacheEntryCreated
    public void entryCreated(ClientCacheEntryCustomEvent<EntityDemo> event) {
        System.out.println("Entry created!");
        System.out.println(event.getEventData());
    }
}

我用这段代码添加监听器

this.cache.addClientListener(new Listener(), new Object[] {2L}), new Object[]{});

当我将新对象放入 infinispan 缓存时,出现以下异常:

[Server:instance-one] 19:08:56,053 ERROR [org.infinispan.interceptors.impl.InvocationContextInterceptor] (HotRod-ServerHandler-4-103) ISPN000136: Error executing command PutKeyValueCommand, writing keys [WrappedByteArray{bytes=[B0x033E023134, hashCode=33250249}]: org.infinispan.commons.CacheListenerException: ISPN000280: Caught exception [java.lang.ClassCastException] while invoking method [public void org.infinispan.server.hotrod.ClientListenerRegistry$BaseClientEventSender.onCacheEvent(org.infinispan.notifications.cachelistener.event.CacheEntryEvent)] on listener instance: org.infinispan.server.hotrod.ClientListenerRegistry$StatefulClientEventSender@72f19221
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl.lambda$invoke(AbstractListenerImpl.java:387)
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl$$Lambda0/2000982649.run(Unknown Source)
[Server:instance-one]   at org.infinispan.util.concurrent.WithinThreadExecutor.execute(WithinThreadExecutor.java:20)
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl.invoke(AbstractListenerImpl.java:404)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.doRealInvocation(CacheNotifierImpl.java:1689)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$ClusteredListenerInvocation.doRealInvocation(CacheNotifierImpl.java:1586)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.invokeNoChecks(CacheNotifierImpl.java:1680)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl$BaseCacheEntryListenerInvocation.invoke(CacheNotifierImpl.java:1654)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.CacheNotifierImpl.notifyCacheEntryCreated(CacheNotifierImpl.java:395)
[Server:instance-one]   at org.infinispan.notifications.cachelistener.NotifyHelper.entryCommitted(NotifyHelper.java:46)
[Server:instance-one]   at org.infinispan.interceptors.locking.ClusteringDependentLogic$DistributionLogic.commitSingleEntry(ClusteringDependentLogic.java:576)
[Server:instance-one]   at org.infinispan.interceptors.locking.ClusteringDependentLogic$AbstractClusteringDependentLogic.commitEntry(ClusteringDependentLogic.java:190)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitContextEntry(EntryWrappingInterceptor.java:584)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitEntryIfNeeded(EntryWrappingInterceptor.java:813)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.commitContextEntries(EntryWrappingInterceptor.java:566)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.applyChanges(EntryWrappingInterceptor.java:617)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.applyAndFixVersion(EntryWrappingInterceptor.java:678)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor$$Lambda2/1400288933.accept(Unknown Source)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextThenAccept(BaseAsyncInterceptor.java:105)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.setSkipRemoteGetsAndInvokeNextForDataCommand(EntryWrappingInterceptor.java:672)
[Server:instance-one]   at org.infinispan.interceptors.impl.EntryWrappingInterceptor.visitPutKeyValueCommand(EntryWrappingInterceptor.java:302)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndFinally(BaseAsyncInterceptor.java:150)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.lambda$nonTxLockAndInvokeNext(AbstractLockingInterceptor.java:299)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor$$Lambda3/1667920547.apply(Unknown Source)
[Server:instance-one]   at org.infinispan.interceptors.SyncInvocationStage.addCallback(SyncInvocationStage.java:42)
[Server:instance-one]   at org.infinispan.interceptors.InvocationStage.andHandle(InvocationStage.java:65)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.nonTxLockAndInvokeNext(AbstractLockingInterceptor.java:294)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.visitNonTxDataWriteCommand(AbstractLockingInterceptor.java:126)
[Server:instance-one]   at org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor.visitDataWriteCommand(NonTransactionalLockingInterceptor.java:40)
[Server:instance-one]   at org.infinispan.interceptors.locking.AbstractLockingInterceptor.visitPutKeyValueCommand(AbstractLockingInterceptor.java:82)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndHandle(BaseAsyncInterceptor.java:183)
[Server:instance-one]   at org.infinispan.statetransfer.StateTransferInterceptor.handleNonTxWriteCommand(StateTransferInterceptor.java:309)
[Server:instance-one]   at org.infinispan.statetransfer.StateTransferInterceptor.handleWriteCommand(StateTransferInterceptor.java:252)
[Server:instance-one]   at org.infinispan.statetransfer.StateTransferInterceptor.visitPutKeyValueCommand(StateTransferInterceptor.java:96)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:54)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.handleDefault(DDAsyncInterceptor.java:54)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.visitPutKeyValueCommand(DDAsyncInterceptor.java:60)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndExceptionally(BaseAsyncInterceptor.java:123)
[Server:instance-one]   at org.infinispan.interceptors.impl.InvocationContextInterceptor.visitCommand(InvocationContextInterceptor.java:90)
[Server:instance-one]   at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:56)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.handleDefault(DDAsyncInterceptor.java:54)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.visitPutKeyValueCommand(DDAsyncInterceptor.java:60)
[Server:instance-one]   at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:68)
[Server:instance-one]   at org.infinispan.interceptors.DDAsyncInterceptor.visitCommand(DDAsyncInterceptor.java:50)
[Server:instance-one]   at org.infinispan.interceptors.impl.AsyncInterceptorChainImpl.invokeAsync(AsyncInterceptorChainImpl.java:234)
[Server:instance-one]   at org.infinispan.cache.impl.CacheImpl.executeCommandAndCommitIfNeededAsync(CacheImpl.java:1930)
[Server:instance-one]   at org.infinispan.cache.impl.CacheImpl.putAsync(CacheImpl.java:1571)
[Server:instance-one]   at org.infinispan.cache.impl.DecoratedCache.putAsync(DecoratedCache.java:690)
[Server:instance-one]   at org.infinispan.cache.impl.AbstractDelegatingAdvancedCache.putAsync(AbstractDelegatingAdvancedCache.java:386)
[Server:instance-one]   at org.infinispan.cache.impl.EncoderCache.putAsync(EncoderCache.java:460)
[Server:instance-one]   at org.infinispan.cache.impl.AbstractDelegatingAdvancedCache.putAsync(AbstractDelegatingAdvancedCache.java:386)
[Server:instance-one]   at org.infinispan.server.hotrod.CacheRequestProcessor.putInternal(CacheRequestProcessor.java:194)
[Server:instance-one]   at org.infinispan.server.hotrod.CacheRequestProcessor.lambda$put(CacheRequestProcessor.java:187)
[Server:instance-one]   at org.infinispan.server.hotrod.CacheRequestProcessor$$Lambda4/240795618.run(Unknown Source)
[Server:instance-one]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[Server:instance-one]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[Server:instance-one]   at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[Server:instance-one]   at java.lang.Thread.run(Thread.java:745)
[Server:instance-one] Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
[Server:instance-one]   at org.infinispan.server.hotrod.ClientListenerRegistry$BaseClientEventSender.onCacheEvent(ClientListenerRegistry.java:360)
[Server:instance-one]   at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
[Server:instance-one]   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[Server:instance-one]   at java.lang.reflect.Method.invoke(Method.java:497)
[Server:instance-one]   at org.infinispan.notifications.impl.AbstractListenerImpl$ListenerInvocationImpl.lambda$invoke(AbstractListenerImpl.java:382)
[Server:instance-one]   ... 61 more

我搜索了 infinispan 文档,但找不到任何内容。

为了能够通过 Hot Rod 使用查询,您不需要兼容模式,我可以看到您的缓存有 <compatibility enabled="true"/>。查询也不需要在服务器中部署实体。

关于兼容模式,它在 Infinispan 9.4.0 中被弃用,替代方案是 http://infinispan.org/docs/stable/user_guide/user_guide.html#endpoint_interop

最后,如果你真的需要在缓存中存储未编组的对象,你需要在服务器中部署实体和编组器。要部署额外的 protobuf 编组器,请参阅:

http://infinispan.org/docs/stable/user_guide/user_guide.html#protostream_deployment

要部署实体,请参阅:

http://infinispan.org/docs/stable/user_guide/user_guide.html#entities_deploy

您不需要使用兼容模式来进行查询,正如另一个答案已经提到的那样。所以我会先尝试没有兼容模式。在没有兼容模式的情况下,您也不再需要在服务器中部署实体和编组器,因此一切都得到了简化,并且出现错误的可能性也较小。我已经看到您在 EntityDemoMarshaller.readFrom 中遇到了一个小问题。它只是 returns null 而不是返回未编组的实体,因此无论如何都有可能毁掉整个事情。如果问题仍然失败,请修复并使用新的堆栈跟踪更新此问题;如果不再是问题,请关闭它。

如果您的用例出于任何原因绝对需要兼容模式,那么请阅读用户指南中的转码并使用它,因为兼容模式已被弃用并被转码所取代。