.NET Core 和 Java 之间的 Apache Ignite 二进制序列化不起作用

Apache Ignite Binary Serialization between .NET Core and Java is not working

大家好,我有 运行 问题,我在 .NET 核心中使用一组 Apache Ignite (2.8.1) 服务器节点来创建数据网格并 运行 查询网格通过 Apache ignite java 客户端。我完全没有问题以二进制模式将数据写入网格并通过提供的思考层询问查询。我使用 DBeaver 进行 运行 查询,一切都如预期的那样正常。当我尝试从 java 客户端查询数据时,问题出现了,该客户端抱怨缓存中存在冲突“:缓存配置合并期间的冲突 MY_CAHE”。查找以下错误消息:

Caused by: class org.apache.ignite.spi.IgniteSpiException: Conflicts during configuration merge for cache 'DOTNET_BINARY_CACHE' : 
TRADE conflict: 
keyType is different: local=Apache.Ignite.Core.Cache.Affinity.AffinityKey, received=org.apache.ignite.cache.affinity.AffinityKey
valType is different: local=Servicing.Agent4.Service.Implementation.Misc.Ignite.Trade, received=Servicing.Agent4.Core.Java.Models.Trade

在下面找到我在 .NET 和 Java 中的实现:

  public static class IgniteUtils
    {
        const string CACHE_NAME = "DOTNET_BINARY_CACHE";

        public static IgniteConfiguration DefaultIgniteConfig()
        {
            return new IgniteConfiguration
            {
                BinaryConfiguration = new BinaryConfiguration
                {
                    NameMapper = new BinaryBasicNameMapper { IsSimpleName = true },
                    CompactFooter = true,
                    TypeConfigurations = new[] {
                        new BinaryTypeConfiguration(typeof(Trade)) {
                            Serializer = new IgniteTradeSerializer()
                        }
                    }
                },
                // omit jvm and network options
                IncludedEventTypes = EventType.All,
                Logger = new IgniteNLogLogger(),
                CacheConfiguration = new[]{

                    new CacheConfiguration{
                            Name = CACHE_NAME,
                            CacheMode = CacheMode.Partitioned,
                            Backups = 0,
                            QueryEntities = new[] { new QueryEntity(typeof(AffinityKey), typeof(Trade))}
                }
            }
            };
        }
    }

Apache Ignite 的设置发生在 class:

 public class IgniteService
    {
        public void Start()
        {
           IIgnite _ignite = Ignition.Start(IgniteUtils.DefaultIgniteConfig());        
        
            // Create new cache and configure queries for Trade binary types.
            // Note that there are no such classes defined.
            var cache0 = _ignite.GetOrCreateCache<AffinityKey, Trade>("DOTNET_BINARY_CACHE");

            // Switch to binary mode to work with data in serialized form.
             var cache = cache0.WithKeepBinary<AffinityKey, IBinaryObject>();

            // Clean up caches on all nodes before run.
            cache.Clear();

            // Populate cache with sample data entries.
               IBinary binary = cache.Ignite.GetBinary();

            cache[new AffinityKey(1, 1)] = binary.GetBuilder("TRADE")
                .SetField("Symbol", "James Wilson")
                .SetField("Id", 1)
                .SetField("Login", 123)
                .SetField("SourceId", 1)
                .Build();     
        }

以下域 class:

    public class Trade
    {
        [QuerySqlField(IsIndexed = true)]
        public int Id { set; get; }
        [QueryTextField]
        public string Symbol { set; get; }
        [QuerySqlField]
        public int Login { set; get; }
        [QuerySqlField(IsIndexed = true)]
        public int SourceId { get; set; }

        //omit constructor    
    }

Java客户端代码

public class IgniteScheduler {

    final String CACHE_NAME = "DOTNET_BINARY_CACHE";

    @PostConstruct
    public void start() {
        IgniteConfiguration cfg = new IgniteConfiguration();
        // Enable client mode.
        cfg.setClientMode(true);
        CacheConfiguration<AffinityKey<Integer>, Trade> cacheCfg = new CacheConfiguration<>();
        cacheCfg.setName(CACHE_NAME);
        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
        cacheCfg.setBackups(0);
        cacheCfg.setQueryEntities(Arrays.asList(new QueryEntity(AffinityKey.class, Trade.class)));
        // Setting up an IP Finder to ensure the client can locate the servers.
        TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
        ipFinder.setAddresses(Collections.singletonList("127.0.0.1:47500..47509"));
        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
        cfg.setCacheConfiguration(cacheCfg);
        // Configure Ignite to connect with .NET nodes
        cfg.setBinaryConfiguration(new BinaryConfiguration()
                .setNameMapper(new BinaryBasicNameMapper(true))
                 .setCompactFooter(true)
               BinaryTypeConfiguration(Trade.class.getSimpleName())))             
        );

        // Start Ignite in client mode.
        Ignite ignite = Ignition.start(cfg);
        
      // omit functional code
}

以下域 class:

@Data
public class Trade implements Serializable {

    @QuerySqlField(index = true)
    public int Id;
    @QueryTextField
    public String Symbol;
    @QuerySqlField
    public int Login;
    //@AffinityKeyMapped does not work as well
    @QuerySqlField(index = true)
    public int SourceId;

    // omit constructor
}

调试信息

看起来简单名称编组器不适用于您的情况。您可能使用其他一些配置而不是上面指定的配置来启动节点,因为它似乎定义了简单的名称编组器。

另外,也许只是您不需要定义缓存配置两次。选择一个地方来定义它(我会选择 Java 一侧),无论如何都会为集群的所有节点定义缓存。

  1. 您不需要在所有节点上都提供缓存配置。缓存配置应该在启动缓存的节点上提供一次。

删除 Java 端的 setCacheConfiguration 并在那里简单地调用 ignite.cache(CACHE_NAME)

  1. 异常是由于NameMapper不适用于查询实体,KeyTypeValueType总是使用完整的类型名称(带命名空间)。

通常这不是问题,您可以在查询中省略名称空间:select name from Trade 在您的示例中有效(假设 Trade 的名称为 属性)。

工作示例:https://gist.github.com/ptupitsyn/a2c13f47e19ccfc9c0b548cf4d4fa629