多线程读取导致 Cassandra 会话数据损坏
Cassandra session data corruption from multithreaded read
我在使用 cassandra api 和 select 查询时遇到问题。在单线程上下文中它工作得很好。然而,当多个线程使用同一个对象并调用该函数时,即使只有两个线程,cassandra 也会返回带有错误数据的未来(有时似乎被其他 select 查询的数据覆盖,其他时候它只是垃圾)。我有一个单例对象来处理我对 cassandra 的调用,还有一个字节缓冲区结构来保存返回的数据。我已经确定,在单个线程上,一切都按预期工作,但是当我使用相同的对象添加更多线程并调用相同的函数时,错误的数据被引入到从 cassandra 返回的数据中。
值大小约为 1kb,键为 32 字节。
您可以看到被注释掉的互斥锁包围的 2 行。如果未注释,这可以防止不正确的数据问题,但也会抵消增加线程数带来的任何性能提升。
此外,损坏查询的百分比约为 33%。
cassandra api 应该能够毫无问题地处理多个线程和会话连接(根据 http://datastax.github.io/cpp-driver/topics/),那么为什么我会返回错误数据?
我正在使用 Centos7、c++14 和 cassandra api 2.15 和 cassandra 3.11.4
#include <cassert>
#include "DataObj.h"//contains the rest of my includes
//byteBuf has two members: a const uint8_t pointer, and a uint32_t variable for declaring the size
DataObj::byteBuf DataObj::threadGet(DataObj::byteBuf key)
{
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture* future = NULL;
string temp = "SELECT value FROM "+ keyspace+"."+tablename+" WHERE id = ?";// variables defined elsewhere
const char* query = (const char*)temp.c_str();
statement = cass_statement_new(query, 1);
//I am also having issues with prepared statements not working properly,
//but I believe it is not directly related to this question. This setup was working fine on 1 thread
cass_statement_bind_bytes(statement, 0, key.data, key.size);
//rw_mut.lock();
future = cass_session_execute(m_session, statement);// m_session is the cassandra session of the object
cass_future_wait(future);
//rw_mut.unlock();
//The two statements above, when gated by the mutex, do not produce errors when multithreading,
// but also do not gain any performance.
// When not gated, works fine on a single thread, but corrupts the return data on 2 or more threads
const uint8_t* st=nullptr;
size_t len=0;
rc = cass_future_error_code(future);
if (rc != CASS_OK)
{
//error handling...
}
else
{
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator))
{
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_bytes(cass_row_get_column_by_name(row, "value"), &st,&len);
}
cass_result_free(result);
cass_iterator_free(iterator);
}
DataObj::byteBuf res((uint8_t *) st, len);
//was able to use gdb here and confirm that the data is corrupt
cass_future_free(future);
return res;
}
看起来您在将值指针复制到 byteBuf
之前释放结果。在单线程版本中,您可能会很幸运,取消引用的内存仍然完好无损。多线程,你很可能会覆盖。
我在使用 cassandra api 和 select 查询时遇到问题。在单线程上下文中它工作得很好。然而,当多个线程使用同一个对象并调用该函数时,即使只有两个线程,cassandra 也会返回带有错误数据的未来(有时似乎被其他 select 查询的数据覆盖,其他时候它只是垃圾)。我有一个单例对象来处理我对 cassandra 的调用,还有一个字节缓冲区结构来保存返回的数据。我已经确定,在单个线程上,一切都按预期工作,但是当我使用相同的对象添加更多线程并调用相同的函数时,错误的数据被引入到从 cassandra 返回的数据中。
值大小约为 1kb,键为 32 字节。
您可以看到被注释掉的互斥锁包围的 2 行。如果未注释,这可以防止不正确的数据问题,但也会抵消增加线程数带来的任何性能提升。
此外,损坏查询的百分比约为 33%。
cassandra api 应该能够毫无问题地处理多个线程和会话连接(根据 http://datastax.github.io/cpp-driver/topics/),那么为什么我会返回错误数据?
我正在使用 Centos7、c++14 和 cassandra api 2.15 和 cassandra 3.11.4
#include <cassert>
#include "DataObj.h"//contains the rest of my includes
//byteBuf has two members: a const uint8_t pointer, and a uint32_t variable for declaring the size
DataObj::byteBuf DataObj::threadGet(DataObj::byteBuf key)
{
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture* future = NULL;
string temp = "SELECT value FROM "+ keyspace+"."+tablename+" WHERE id = ?";// variables defined elsewhere
const char* query = (const char*)temp.c_str();
statement = cass_statement_new(query, 1);
//I am also having issues with prepared statements not working properly,
//but I believe it is not directly related to this question. This setup was working fine on 1 thread
cass_statement_bind_bytes(statement, 0, key.data, key.size);
//rw_mut.lock();
future = cass_session_execute(m_session, statement);// m_session is the cassandra session of the object
cass_future_wait(future);
//rw_mut.unlock();
//The two statements above, when gated by the mutex, do not produce errors when multithreading,
// but also do not gain any performance.
// When not gated, works fine on a single thread, but corrupts the return data on 2 or more threads
const uint8_t* st=nullptr;
size_t len=0;
rc = cass_future_error_code(future);
if (rc != CASS_OK)
{
//error handling...
}
else
{
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator))
{
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_bytes(cass_row_get_column_by_name(row, "value"), &st,&len);
}
cass_result_free(result);
cass_iterator_free(iterator);
}
DataObj::byteBuf res((uint8_t *) st, len);
//was able to use gdb here and confirm that the data is corrupt
cass_future_free(future);
return res;
}
看起来您在将值指针复制到 byteBuf
之前释放结果。在单线程版本中,您可能会很幸运,取消引用的内存仍然完好无损。多线程,你很可能会覆盖。