尝试从 python 写入 cassandra 时 CQL 查询中的语法错误
Syntax error in CQL query when trying to write to cassandra from python
所以,我正在 python 中构建一个应用程序,它从 Twitter 获取数据,然后将其保存到 Cassandra。我目前的问题在于一个脚本,该脚本从kafka读取数据并尝试将其写入cassandra,如下所示:
import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster
from kafka import KafkaConsumer, KafkaProducer
class Consumer(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_event = multiprocessing.Event()
def stop(self):
self.stop_event.set()
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
consumer.subscribe(['twitter'])
while not self.stop_event.is_set():
for message in consumer:
# session.execute(
# """
# INSERT INTO mensaje_73 (tweet)
# VALUES (message)
# """
# )
print(message)
cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
"""
INSERT INTO mensaje_73 (tweet)
VALUES (message)
"""
)
# if self.stop_event.is_set():
# break
consumer.close()
def main():
tasks = [
Consumer()
]
for t in tasks:
t.start()
time.sleep(10)
for task in tasks:
task.stop()
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%
(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
我已尝试将测试消息插入 table 推特。mensaje_73 并且效果很好,如下所示:
import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster
from kafka import KafkaConsumer, KafkaProducer
cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
"""
INSERT INTO mensaje_73 (tweet)
VALUES ('helooo')
"""
)
任何帮助将不胜感激:)
所以这里的问题是,您的 message
变量在 CQL 中被视为文字,如果没有单引号,它将无法工作。因此,错误。
要解决这个问题,我会使用准备好的语句,然后将 message
绑定到它:
session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
"""
INSERT INTO mensaje_73 (tweet)
VALUES (?)
"""
)
session.execute(preparedTweetInsert,[message])
试一试,看看是否有帮助。
此外,这似乎是一个简单的数据模型。但是要问自己一件事,您将如何查询这些数据?除非 tweet
是您唯一的 PRIMARY KEY,否则这是行不通的。这也意味着您可以查询单个推文的唯一方法是通过消息的确切文本。需要考虑的事情,但按天对其进行分区可能是更好的选择,因为它会很好地分布并提供更好的查询模型。
所以,我正在 python 中构建一个应用程序,它从 Twitter 获取数据,然后将其保存到 Cassandra。我目前的问题在于一个脚本,该脚本从kafka读取数据并尝试将其写入cassandra,如下所示:
import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster
from kafka import KafkaConsumer, KafkaProducer
class Consumer(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_event = multiprocessing.Event()
def stop(self):
self.stop_event.set()
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
consumer.subscribe(['twitter'])
while not self.stop_event.is_set():
for message in consumer:
# session.execute(
# """
# INSERT INTO mensaje_73 (tweet)
# VALUES (message)
# """
# )
print(message)
cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
"""
INSERT INTO mensaje_73 (tweet)
VALUES (message)
"""
)
# if self.stop_event.is_set():
# break
consumer.close()
def main():
tasks = [
Consumer()
]
for t in tasks:
t.start()
time.sleep(10)
for task in tasks:
task.stop()
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%
(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
我已尝试将测试消息插入 table 推特。mensaje_73 并且效果很好,如下所示:
import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster
from kafka import KafkaConsumer, KafkaProducer
cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
"""
INSERT INTO mensaje_73 (tweet)
VALUES ('helooo')
"""
)
任何帮助将不胜感激:)
所以这里的问题是,您的 message
变量在 CQL 中被视为文字,如果没有单引号,它将无法工作。因此,错误。
要解决这个问题,我会使用准备好的语句,然后将 message
绑定到它:
session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
"""
INSERT INTO mensaje_73 (tweet)
VALUES (?)
"""
)
session.execute(preparedTweetInsert,[message])
试一试,看看是否有帮助。
此外,这似乎是一个简单的数据模型。但是要问自己一件事,您将如何查询这些数据?除非 tweet
是您唯一的 PRIMARY KEY,否则这是行不通的。这也意味着您可以查询单个推文的唯一方法是通过消息的确切文本。需要考虑的事情,但按天对其进行分区可能是更好的选择,因为它会很好地分布并提供更好的查询模型。