Синтаксическая ошибка в CQL-запросе при попытке записи на cassandra из python
Итак, я создаю приложение на python, которое берет данные из твиттера, а затем сохраняет их на Кассандре. Мои текущие проблемы заключаются в скрипте, который читает данные из kafka и пытается записать их в cassandra следующим образом:
import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster
from kafka import KafkaConsumer, KafkaProducer
class Consumer(multiprocessing.Process):
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_event = multiprocessing.Event()
def stop(self):
self.stop_event.set()
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
consumer.subscribe(['twitter'])
while not self.stop_event.is_set():
for message in consumer:
# session.execute(
# """
# INSERT INTO mensaje_73 (tweet)
# VALUES (message)
# """
# )
print(message)
cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
"""
INSERT INTO mensaje_73 (tweet)
VALUES (message)
"""
)
# if self.stop_event.is_set():
# break
consumer.close()
def main():
tasks = [
Consumer()
]
for t in tasks:
t.start()
time.sleep(10)
for task in tasks:
task.stop()
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%
(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
Я попытался вставить тестовые сообщения в таблицу twitter.mensaje_73, и это сработало отлично, как здесь:
import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster
from kafka import KafkaConsumer, KafkaProducer
cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
"""
INSERT INTO mensaje_73 (tweet)
VALUES ('helooo')
"""
)
Любая помощь будет высоко ценится:)
1 ответ
Так что проблема здесь в том, что ваш message
переменная рассматривается как литерал в CQL, который не будет работать без одинарных кавычек. Отсюда и ошибка.
Чтобы это исправить, я бы пошел по пути использования подготовленного утверждения, а затем связал message
к нему:
session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
"""
INSERT INTO mensaje_73 (tweet)
VALUES (?)
"""
)
session.execute(preparedTweetInsert,[message])
Попробуйте и посмотрите, поможет ли это.
Кроме того, это похоже на простую модель данных. Но вы должны спросить себя: как вы собираетесь запрашивать эти данные? Это не будет работать, если tweet
был твой единственный ПЕРВИЧНЫЙ КЛЮЧ. Это также означает, что вы можете запросить отдельный твит только путем точного текста сообщения. Есть над чем подумать, но лучше разбить его по дням, так как он будет хорошо распределяться и обеспечит гораздо лучшую модель запросов.