Как выслушать изменения в коллекции MongoDB?

Я создаю своего рода фоновую систему очередей заданий с MongoDB в качестве хранилища данных. Как я могу "прослушать" вставки в коллекцию MongoDB, прежде чем создавать рабочих для обработки работы? Нужно ли опрашивать каждые несколько секунд, чтобы увидеть, есть ли какие-либо изменения с прошлого раза, или есть способ, которым мой сценарий может ожидать вставки? Это PHP-проект, над которым я работаю, но не стесняйтесь отвечать на Ruby или не зависимо от языка.

12 ответов

Решение

MongoDB имеет то, что называется capped collections а также tailable cursors это позволяет MongoDB передавать данные слушателям.

capped collection по сути это коллекция, которая имеет фиксированный размер и допускает только вставки. Вот как это будет выглядеть:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable курсоры ( оригинальный пост Джонатана Х. Вейджа)

Рубин

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Питон ( Роберт Стюарт)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl ( Макс)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Дополнительные ресурсы:

Ruby / Node.js Tutorial, который проведет вас через создание приложения, которое прослушивает вставки в ограниченную коллекцию MongoDB.

Статья, рассказывающая о настраиваемых курсорах более подробно.

Примеры использования настраиваемых курсоров на PHP, Ruby, Python и Perl.

То, о чем вы думаете, очень похоже на триггеры. MongoDB не имеет никакой поддержки триггеров, однако некоторые люди "свернули свои собственные", используя некоторые приемы. Ключевым моментом здесь является оплог.

Когда вы запускаете MongoDB в наборе реплик, все действия MongoDB записываются в журнал операций (известный как oplog). Оплог - это в основном просто список изменений, внесенных в данные. Реплики Устанавливает функцию, прослушивая изменения в этом журнале операций и затем применяя изменения локально.

Это звучит знакомо?

Я не могу описать весь процесс здесь, это несколько страниц документации, но инструменты, которые вам нужны, доступны.

Сначала несколько рецензий на оплог - Краткое описание - Макет local коллекция (которая содержит оплог)

Вы также хотите использовать настраиваемые курсоры. Это даст вам возможность прослушивать изменения, а не опрашивать их. Обратите внимание, что для репликации используются настраиваемые курсоры, поэтому это поддерживаемая функция.

Проверьте это: изменение потоков

10 января 2018 г. - выпуск 3.6

* РЕДАКТИРОВАТЬ: я написал статью о том, как сделать это https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


Это новое в mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Для использования changeStreams база данных должна быть набором репликации

Подробнее о наборах репликации: https://docs.mongodb.com/manual/replication/

Ваша база данных будет "Автономной" по умолчанию.

Как преобразовать автономный набор реплик: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


Следующий пример - практическое применение того, как вы можете это использовать.
* Специально для узла.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Полезные ссылки:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

Начиная с MongoDB 3.6 появится новый API уведомлений под названием Change Streams, который вы можете использовать для этого. Смотрите этот пост в блоге для примера. Пример из этого:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

MongoDB версии 3.6 теперь включает потоки изменений, которые, по сути, представляют собой API поверх OpLog, что позволяет использовать сценарии, подобные триггерам / уведомлениям.

Вот ссылка на пример Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

Пример NodeJS может выглядеть примерно так:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });

В качестве альтернативы вы можете использовать стандартный метод Mongo FindAndUpdate и в обратном вызове вызывать событие EventEmitter (в узле) при выполнении обратного вызова.

Любые другие части приложения или архитектуры, прослушивающие это событие, будут уведомлены об обновлении, и любые соответствующие данные также будут отправлены туда. Это действительно простой способ получения уведомлений от Mongo.

Многие из этих ответов дадут вам только новые записи, а не обновления и / или крайне неэффективны

Единственный надежный и эффективный способ сделать это - создать настраиваемый курсор в локальной коллекции db: oplog.rs, чтобы получить ВСЕ изменения в MongoDB и делать с ними все, что вы захотите. (MongoDB даже делает это внутренне более или менее для поддержки репликации!)

Объяснение содержания оплога: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Пример библиотеки Node.js, которая предоставляет API вокруг того, что можно сделать с помощью оплога: https://github.com/cayasso/mongo-oplog

Существует потрясающий набор доступных сервисов, который называется MongoDB Stitch. Посмотрите на функции / триггеры стежка. Обратите внимание, что это облачный платный сервис (AWS). В вашем случае, при вставке, вы можете вызвать пользовательскую функцию, написанную на javascript.

Существует рабочий пример Java, который можно найти здесь.

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

Ключ - ВАРИАНТЫ ОПРОСА, приведенные здесь.

Также вы можете изменить запрос поиска, если вам не нужно каждый раз загружать все данные.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

На самом деле, вместо просмотра вывода, почему вы не получаете уведомление, когда вставляется что-то новое, используя промежуточное ПО, предоставленное схемой mongoose

Вы можете поймать событие вставки нового документа и сделать что-то после того, как эта вставка сделана

После версии 3.6 разрешено использовать базу данных следующих типов триггеров базы данных:

  • триггеры, управляемые событиями - полезны для автоматического обновления связанных документов, уведомления последующих служб, распространения данных для поддержки смешанных рабочих нагрузок, целостности данных и аудита
  • запланированные триггеры - полезны для запланированных рабочих нагрузок извлечения, распространения, архивирования и аналитики данных

Войдите в свою учетную запись Atlas и выберите Triggers интерфейс и добавьте новый триггер:

Разверните каждый раздел для получения дополнительных настроек или подробностей.

Для тех, кто ищет пример потоков изменений на C# , вот пример, который также демонстрирует токен возобновления. Также очень важно отметить, что это работает только с MongoDB Atlas и НЕ будет работать с контейнером Docker, на котором работает MongoDB. См. документацию по потокам изменений , в которой говорится, что потоки изменений требуют:

  1. База данных должна находиться в наборе реплик или сегментированном кластере.
  2. База данных должна использовать механизм хранения WiredTiger.
  3. Набор реплик или сегментированный кластер должен использовать протокол набора реплик версии 1.

Поместите это в консольное приложение и добавьте пакет NuGet MongoDB.Driver .

      // Requires MongoDB.Driver NuGet package.
using ChangeMonitoring;
using MongoDB.Driver;
using MongoDB.Bson;
using Microsoft.Extensions.Configuration;

Console.WriteLine("Monitor has started....");

string connectionString = "--your MongoDB Atlas connection string here---";

if (connectionString.StartsWith("--"))
    throw new ArgumentException("Please update the MongoDB atlas connection string!");

string databaseName = "simple_db"; // TODO: Update with your db name
string collectionName = "people";  // TODO: Update with your collection name

var client = new MongoClient(connectionString);

var tokenSource = new CancellationTokenSource();
string? resumeToken = null; // We spit out a resume token below to the console.
await MonitorOneDatabasesAsync(client, databaseName, resumeToken, tokenSource.Token);

Console.WriteLine("Monitor has exited!!");

static async Task MonitorOneDatabasesAsync(IMongoClient client, string monitorDatabaseName, string? resumeToken,
    CancellationToken cancellationToken = default)
{
    var options = new ChangeStreamOptions
    {
        FullDocument = ChangeStreamFullDocumentOption.Default, 
        ResumeAfter = string.IsNullOrWhiteSpace(resumeToken) ? null : new BsonDocument().Add("_data", resumeToken)
    };
   

    IChangeStreamCursor<ChangeStreamDocument<BsonDocument>> streamCursor = await client
        .GetDatabase(monitorDatabaseName)
        .WatchAsync(options, cancellationToken);
    
    foreach (ChangeStreamDocument<BsonDocument> changeItem in streamCursor.ToEnumerable())
    {
        Console.WriteLine($"Key that changed: {changeItem.DocumentKey}  Operation Type: {changeItem.OperationType}");
        Console.WriteLine($"Resume Token: {changeItem.ResumeToken["_data"]}");

        // Delete doesn't send the full document!
        if (changeItem.FullDocument != null)
        {
            // Show all the fields on the document.
            foreach (string name in changeItem.FullDocument.Names)
            {
                Console.WriteLine($"  {name}: {changeItem.FullDocument[name]}");
            }
        }

        if (cancellationToken.IsCancellationRequested)
            break;
    }
}
Другие вопросы по тегам