Синтаксическая ошибка в CQL-запросе при попытке записи на cassandra из python

Итак, я создаю приложение на python, которое берет данные из твиттера, а затем сохраняет их на Кассандре. Мои текущие проблемы заключаются в скрипте, который читает данные из kafka и пытается записать их в cassandra следующим образом:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer




class Consumer(multiprocessing.Process):
   def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
         self.stop_event.set()

    def run(self):
       consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        consumer.subscribe(['twitter'])



    while not self.stop_event.is_set():
        for message in consumer:
            # session.execute(
            #     """
            #     INSERT INTO mensaje_73 (tweet)
            #     VALUES (message)
            #     """
            # )
            print(message)
            cluster = Cluster()
            session = cluster.connect('twitter')
            session.execute(
                    """
                    INSERT INTO mensaje_73 (tweet)
                    VALUES (message)
                    """
                )

            # if self.stop_event.is_set():
            #     break

    consumer.close()


   def main():

    tasks = [
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()



if __name__ == "__main__":
     logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:% 
   (levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

Я попытался вставить тестовые сообщения в таблицу twitter.mensaje_73, и это сработало отлично, как здесь:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer


cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
    """
    INSERT INTO mensaje_73 (tweet)
    VALUES ('helooo')
    """
)

Любая помощь будет высоко ценится:)

1 ответ

Решение

Так что проблема здесь в том, что ваш message переменная рассматривается как литерал в CQL, который не будет работать без одинарных кавычек. Отсюда и ошибка.

Чтобы это исправить, я бы пошел по пути использования подготовленного утверждения, а затем связал message к нему:

session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
        """
        INSERT INTO mensaje_73 (tweet)
        VALUES (?)
        """
    )
session.execute(preparedTweetInsert,[message])

Попробуйте и посмотрите, поможет ли это.

Кроме того, это похоже на простую модель данных. Но вы должны спросить себя: как вы собираетесь запрашивать эти данные? Это не будет работать, если tweet был твой единственный ПЕРВИЧНЫЙ КЛЮЧ. Это также означает, что вы можете запросить отдельный твит только путем точного текста сообщения. Есть над чем подумать, но лучше разбить его по дням, так как он будет хорошо распределяться и обеспечит гораздо лучшую модель запросов.

Другие вопросы по тегам