Как Python может наблюдать изменения в оплоге Mongodb
У меня есть несколько скриптов Python, пишущих в Mongodb с использованием pyMongo. Как другой скрипт Python может наблюдать изменения в запросе Mongo и выполнять некоторую функцию, когда изменение происходит? mongodb настроен с включенным оплогом.
4 ответа
Я написал инструмент инкрементного резервного копирования для MongoDB некоторое время назад, на Python. Инструмент отслеживает изменения данных, отслеживая oplog
, Вот соответствующая часть кода.
Обновленный ответ, pymongo 3
from time import sleep
from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect
# Time to wait for data or connection.
_SLEEP = 1.0
if __name__ == '__main__':
oplog = MongoClient().local.oplog.rs
stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']
while True:
kw = {}
kw['filter'] = {'ts': {'$gt': stamp}}
kw['cursor_type'] = CursorType.TAILABLE_AWAIT
kw['oplog_replay'] = True
cursor = oplog.find(**kw)
try:
while cursor.alive:
for doc in cursor:
stamp = doc['ts']
print(doc) # Do something with doc.
sleep(_SLEEP)
except AutoReconnect:
sleep(_SLEEP)
Также см. http://api.mongodb.com/python/current/examples/tailable.html.
Оригинальный ответ, пимонго 2
from time import sleep
from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp
# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}
# Time to wait for data or connection.
_SLEEP = 10
if __name__ == '__main__':
db = MongoClient().local
while True:
query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}} # Replace with your query.
cursor = db.oplog.rs.find(query, **_TAIL_OPTS)
cursor.add_option(_QUERY_OPTIONS['oplog_replay'])
try:
while cursor.alive:
try:
doc = next(cursor)
# Do something with doc.
except (AutoReconnect, StopIteration):
sleep(_SLEEP)
finally:
cursor.close()
Я столкнулся с этой проблемой сегодня и не нашел обновленного ответа нигде.
Класс Cursor изменился с версии 3.0 и больше не принимает tailable
а также await_data
аргументы. В этом примере будет завершен оплог и распечатана запись оплога, когда он найдет запись, более новую, чем последняя найденная.
# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735
# to work with pymongo 3.0
import pymongo
from pymongo.cursor import CursorType
c = pymongo.MongoClient()
# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs
first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']
while True:
cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
while cursor.alive:
for doc in cursor:
ts = doc['ts']
print doc
# Work with doc here
Запросите оплог с помощью настраиваемого курсора.
Это на самом деле забавно, потому что oplog-monitor - это именно то, для чего изначально была добавлена функция tailable-cursor. Я нахожу это чрезвычайно полезным и для других вещей (например, для реализации pubsub на основе mongodb, см. Этот пост, например), но это было первоначальной целью.
Я была такая же проблема. Я собрал этот rescommunes / oplog.py. Проверьте комментарии и посмотрите __main__
для примера того, как вы могли бы использовать его со своим сценарием.