Apache Ignite SQL 事务管理

Apache Ignite SQL Transaction Management

我正在研究 Apache Ignite 中的事务管理,我创建了一个简单的脚本:

  1. 创建 table
  2. 获取行数
  3. 开始交易
  4. 插入一行
  5. 获取行数
  6. 回滚事务
  7. 获取行数

步骤 1 到 5 按预期工作,但步骤 6 无法回滚在步骤 4 中插入的行,并且在步骤 7 中的行数仍然为 1。

我知道需要对 cache/schema 进行一些配置,我想知道我是否做错了。 documentation 建议我需要使用 "TRANSACTIONAL_SNAPTSHOT" 作为原子性模式。

"cluster"只有一个节点。

Apache Ignite 配置文件

    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd">
        <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
            <property name="sqlSchemas">
                <list>
                    <value>SAMPLE</value>
                </list>
            </property>

            <!-- Enabling transactions for the "SAMPLE" cache/schema   -->
            <!-- See: https://apacheignite.readme.io/docs/transactions -->
            <property name="cacheConfiguration">
                <list>
                    <bean class="org.apache.ignite.configuration.CacheConfiguration">
                        <property name="name" value="SAMPLE"/>
                        <property name="atomicityMode" value="TRANSACTIONAL_SNAPSHOT"/>
                        <property name="backups" value="1"/>
                    </bean>
                </list>
            </property>

            <!-- Enabling Apache Ignite Persistent Store. -->
            <property name="dataStorageConfiguration">
                <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                    <property name="defaultDataRegionConfiguration">
                        <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            <property name="persistenceEnabled" value="true"/>
                        </bean>
                    </property>
                </bean>
            </property>

            <!-- Explicitly configure TCP discovery SPI to provide a list of initial nodes. -->
            <property name="discoverySpi">
                <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                    <property name="ipFinder">
                        <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                        <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                            <property name="addresses">
                                <list>
                                    <!-- In distributed environment, replace with actual host IP address. -->
                                    <value>10.0.1.217:47500</value>
                                </list>
                            </property>
                        </bean>
                    </property>
                </bean>
            </property>
        </bean>
    </beans>

测试脚本

from pyignite import Client


def get_rowcount(client):
    result = client.sql('SELECT COUNT(*) FROM SAMPLE.T1')
    row_count = next(result)[0]
    return row_count


def main():
    client = Client()
    client.connect('ignite-host', 10800)
    client.sql('CREATE TABLE IF NOT EXISTS SAMPLE.T1 (k int, v varchar, PRIMARY key(k))')
    client.sql('DELETE SAMPLE.T1')
    start_row_count = get_rowcount(client)        # Expected row count = 0

    client.sql("BEGIN TRANSACTION")
    client.sql("INSERT INTO SAMPLE.T1 (k, v) values (1, 'Line 1')")
    post_insert_row_count = get_rowcount(client)  # Expected row count = 1

    client.sql("ROLLBACK TRANSACTION")
    end_row_count = get_rowcount(client)          # Expected row count = 0, but is 1

    client.close()
    print('Start row count      : {}'.format(start_row_count))
    print('Post insert row count: {}'.format(post_insert_row_count))
    print('End row count        : {}'.format(end_row_count))


if __name__ == '__main__':
    main()

脚本输出

Start row count : 0

Post insert row count: 1

End row count : 1

您错误地声明了您的缓存,因此它仍然具有 PARTITIONED 原子性模式(您需要声明一个缓存模板)。

但主要问题是瘦客户端目前不支持事务: https://issues.apache.org/jira/browse/IGNITE-9410

我假设它仅在持续时间内自动提交。我建议使用 JDBC Python 与 Ignite Thin JDBC 驱动程序绑定。