Как Python может наблюдать изменения в оплоге Mongodb

У меня есть несколько скриптов Python, пишущих в Mongodb с использованием pyMongo. Как другой скрипт Python может наблюдать изменения в запросе Mongo и выполнять некоторую функцию, когда изменение происходит? mongodb настроен с включенным оплогом.

4 ответа

Я написал инструмент инкрементного резервного копирования для MongoDB некоторое время назад, на Python. Инструмент отслеживает изменения данных, отслеживая oplog, Вот соответствующая часть кода.

Обновленный ответ, pymongo 3

from time import sleep

from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect

# Time to wait for data or connection.
_SLEEP = 1.0

if __name__ == '__main__':
    oplog = MongoClient().local.oplog.rs
    stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']

    while True:
        kw = {}

        kw['filter'] = {'ts': {'$gt': stamp}}
        kw['cursor_type'] = CursorType.TAILABLE_AWAIT
        kw['oplog_replay'] = True

        cursor = oplog.find(**kw)

        try:
            while cursor.alive:
                for doc in cursor:
                    stamp = doc['ts']

                    print(doc)  # Do something with doc.

                sleep(_SLEEP)

        except AutoReconnect:
            sleep(_SLEEP)

Также см. http://api.mongodb.com/python/current/examples/tailable.html.

Оригинальный ответ, пимонго 2

from time import sleep

from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp

# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}

# Time to wait for data or connection.
_SLEEP = 10

if __name__ == '__main__':
    db = MongoClient().local

    while True:
        query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}}  # Replace with your query.
        cursor = db.oplog.rs.find(query, **_TAIL_OPTS)

        cursor.add_option(_QUERY_OPTIONS['oplog_replay'])

        try:
            while cursor.alive:
                try:
                    doc = next(cursor)

                    # Do something with doc.

                except (AutoReconnect, StopIteration):
                    sleep(_SLEEP)

        finally:
            cursor.close()

Я столкнулся с этой проблемой сегодня и не нашел обновленного ответа нигде.

Класс Cursor изменился с версии 3.0 и больше не принимает tailable а также await_data аргументы. В этом примере будет завершен оплог и распечатана запись оплога, когда он найдет запись, более новую, чем последняя найденная.

# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735
# to work with pymongo 3.0

import pymongo
from pymongo.cursor import CursorType

c = pymongo.MongoClient()

# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs

first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']

while True:
    cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
    while cursor.alive:
        for doc in cursor:
            ts = doc['ts']
            print doc
            # Work with doc here

Запросите оплог с помощью настраиваемого курсора.

Это на самом деле забавно, потому что oplog-monitor - это именно то, для чего изначально была добавлена ​​функция tailable-cursor. Я нахожу это чрезвычайно полезным и для других вещей (например, для реализации pubsub на основе mongodb, см. Этот пост, например), но это было первоначальной целью.

Я была такая же проблема. Я собрал этот rescommunes / oplog.py. Проверьте комментарии и посмотрите __main__ для примера того, как вы могли бы использовать его со своим сценарием.

Другие вопросы по тегам