Apache Ignite SQL 事务管理
Apache Ignite SQL Transaction Management
我正在研究 Apache Ignite 中的事务管理,我创建了一个简单的脚本:
- 创建 table
- 获取行数
- 开始交易
- 插入一行
- 获取行数
- 回滚事务
- 获取行数
步骤 1 到 5 按预期工作,但步骤 6 无法回滚在步骤 4 中插入的行,并且在步骤 7 中的行数仍然为 1。
我知道需要对 cache/schema 进行一些配置,我想知道我是否做错了。 documentation 建议我需要使用 "TRANSACTIONAL_SNAPTSHOT" 作为原子性模式。
"cluster"只有一个节点。
Apache Ignite 配置文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="sqlSchemas">
<list>
<value>SAMPLE</value>
</list>
</property>
<!-- Enabling transactions for the "SAMPLE" cache/schema -->
<!-- See: https://apacheignite.readme.io/docs/transactions -->
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="SAMPLE"/>
<property name="atomicityMode" value="TRANSACTIONAL_SNAPSHOT"/>
<property name="backups" value="1"/>
</bean>
</list>
</property>
<!-- Enabling Apache Ignite Persistent Store. -->
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
</bean>
</property>
</bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide a list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>10.0.1.217:47500</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
测试脚本
from pyignite import Client
def get_rowcount(client):
result = client.sql('SELECT COUNT(*) FROM SAMPLE.T1')
row_count = next(result)[0]
return row_count
def main():
client = Client()
client.connect('ignite-host', 10800)
client.sql('CREATE TABLE IF NOT EXISTS SAMPLE.T1 (k int, v varchar, PRIMARY key(k))')
client.sql('DELETE SAMPLE.T1')
start_row_count = get_rowcount(client) # Expected row count = 0
client.sql("BEGIN TRANSACTION")
client.sql("INSERT INTO SAMPLE.T1 (k, v) values (1, 'Line 1')")
post_insert_row_count = get_rowcount(client) # Expected row count = 1
client.sql("ROLLBACK TRANSACTION")
end_row_count = get_rowcount(client) # Expected row count = 0, but is 1
client.close()
print('Start row count : {}'.format(start_row_count))
print('Post insert row count: {}'.format(post_insert_row_count))
print('End row count : {}'.format(end_row_count))
if __name__ == '__main__':
main()
脚本输出
Start row count : 0
Post insert row count: 1
End row count : 1
您错误地声明了您的缓存,因此它仍然具有 PARTITIONED 原子性模式(您需要声明一个缓存模板)。
但主要问题是瘦客户端目前不支持事务:
https://issues.apache.org/jira/browse/IGNITE-9410
我假设它仅在持续时间内自动提交。我建议使用 JDBC Python 与 Ignite Thin JDBC 驱动程序绑定。
我正在研究 Apache Ignite 中的事务管理,我创建了一个简单的脚本:
- 创建 table
- 获取行数
- 开始交易
- 插入一行
- 获取行数
- 回滚事务
- 获取行数
步骤 1 到 5 按预期工作,但步骤 6 无法回滚在步骤 4 中插入的行,并且在步骤 7 中的行数仍然为 1。
我知道需要对 cache/schema 进行一些配置,我想知道我是否做错了。 documentation 建议我需要使用 "TRANSACTIONAL_SNAPTSHOT" 作为原子性模式。
"cluster"只有一个节点。
Apache Ignite 配置文件
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="sqlSchemas">
<list>
<value>SAMPLE</value>
</list>
</property>
<!-- Enabling transactions for the "SAMPLE" cache/schema -->
<!-- See: https://apacheignite.readme.io/docs/transactions -->
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="SAMPLE"/>
<property name="atomicityMode" value="TRANSACTIONAL_SNAPSHOT"/>
<property name="backups" value="1"/>
</bean>
</list>
</property>
<!-- Enabling Apache Ignite Persistent Store. -->
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
</bean>
</property>
</bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide a list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>10.0.1.217:47500</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
测试脚本
from pyignite import Client
def get_rowcount(client):
result = client.sql('SELECT COUNT(*) FROM SAMPLE.T1')
row_count = next(result)[0]
return row_count
def main():
client = Client()
client.connect('ignite-host', 10800)
client.sql('CREATE TABLE IF NOT EXISTS SAMPLE.T1 (k int, v varchar, PRIMARY key(k))')
client.sql('DELETE SAMPLE.T1')
start_row_count = get_rowcount(client) # Expected row count = 0
client.sql("BEGIN TRANSACTION")
client.sql("INSERT INTO SAMPLE.T1 (k, v) values (1, 'Line 1')")
post_insert_row_count = get_rowcount(client) # Expected row count = 1
client.sql("ROLLBACK TRANSACTION")
end_row_count = get_rowcount(client) # Expected row count = 0, but is 1
client.close()
print('Start row count : {}'.format(start_row_count))
print('Post insert row count: {}'.format(post_insert_row_count))
print('End row count : {}'.format(end_row_count))
if __name__ == '__main__':
main()
脚本输出
Start row count : 0
Post insert row count: 1
End row count : 1
您错误地声明了您的缓存,因此它仍然具有 PARTITIONED 原子性模式(您需要声明一个缓存模板)。
但主要问题是瘦客户端目前不支持事务: https://issues.apache.org/jira/browse/IGNITE-9410
我假设它仅在持续时间内自动提交。我建议使用 JDBC Python 与 Ignite Thin JDBC 驱动程序绑定。