Регистрация всех запросов с помощью cassandra-python-driver

Я пытаюсь найти способ протоколировать все запросы, сделанные на Cassandra из кода Python. В частности, ведение журнала, как они закончили выполнять с использованием BatchStatement

Есть ли какие-либо хуки или обратные вызовы, которые я могу использовать, чтобы войти?

3 ответа

Решение

2 варианта:

  1. Придерживаться session.add_request_init_listener

    Из исходного кода:

    а) BoundStatement

    https://github.com/datastax/python-driver/blob/3.11.0/cassandra/query.py

    Переданные значения сохраняются в raw_values, вы можете попробовать извлечь его

    б) BatchStatement

    https://github.com/datastax/python-driver/blob/3.11.0/cassandra/query.py

    Он хранит все операторы и параметры, используемые для создания этого объекта в _statements_and_parameters, Кажется, что это может быть получено, хотя это не общественная собственность

    c) Только этот хук называется, мне не удалось найти другие хуки https://github.com/datastax/python-driver/blob/master/cassandra/cluster.py

    Но это не имеет ничего общего с фактическим выполнением запросов - это просто способ проверить, какие типы запросов были построены, и, возможно, добавить дополнительные обратные вызовы / ошибки

  2. Подойдите к нему под другим углом и используйте следы

    https://datastax.github.io/python-driver/faq.html https://datastax.github.io/python-driver/api/cassandra/cluster.html

    Трассировка запроса может быть включена для любого запроса путем установки trace=True в Session.execute_async(). Просмотрите результаты, ожидая в будущем, затем ResponseFuture.get_query_trace()

Вот пример BatchStatement трассировка с использованием варианта 2:

bs = BatchStatement()                                                        
bs.add_all(['insert into test.test(test_type, test_desc) values (%s, %s)',   
            'insert into test.test(test_type, test_desc) values (%s, %s)',   
            'delete from test.test where test_type=%s',
            'update test.test set test_desc=%s where test_type=%s'],
           [['hello1', 'hello1'],                                            
            ['hello2', 'hello2'],                                            
            ['hello2'],
            ['hello100', 'hello1']])     
res = session.execute(bs, trace=True)                                        
trace = res.get_query_trace()                                                
for event in trace.events:                                                   
    if event.description.startswith('Parsing'):                              
        print event.description 

Он производит следующий вывод:

Parsing insert into test.test(test_type, test_desc) values ('hello1', 'hello1')
Parsing insert into test.test(test_type, test_desc) values ('hello2', 'hello2')
Parsing delete from test.test where test_type='hello2'
Parsing update test.test set test_desc='hello100' where test_type='hello1'

add_request_init_listener(fn, *args, **kwargs)

Добавляет обратный вызов с аргументами для вызова при создании любого запроса.

Он будет вызываться как fn(response_future, *args, **kwargs) после создания каждого клиентского запроса и перед отправкой запроса *

Используя обратный вызов, вы можете легко регистрировать все запросы, сделанные этим сеансом.

Пример:

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider


class RequestHandler:

    def on_request(self, rf):
        # This callback is invoked each time a request is created, on the thread creating the request.
        # We can use this to count events, or add callbacks
        print(rf.query)


auth_provider = PlainTextAuthProvider(
    username='cassandra',
    password='cassandra'
)

cluster = Cluster(['192.168.65.199'],auth_provider=auth_provider)
session = cluster.connect('test')

handler = RequestHandler()
# each instance will be registered with a session, and receive a callback for each request generated
session.add_request_init_listener(handler.on_request)

from time import sleep

for count in range(1, 10):
    print(count)
    for row in session.execute("select * from kv WHERE key = %s", ["ed1e49e0-266f-11e7-9d76-fd55504093c1"]):
        print row
    sleep(1)

Рассматривали ли вы создание декоратора для вашего execute или эквивалент (например, execute_concurrent) что регистрирует CQL-запрос, используемый для вашего оператора или подготовленного оператора?

Вы можете написать это так, чтобы запрос CQL регистрировался только в том случае, если запрос был выполнен успешно.

Другие вопросы по тегам