Следите за потоками изменений MongoDB

Мы хотим, чтобы наше приложение Go прослушивало изменения данных в коллекции. Таким образом, прибегая к поиску решения, мы натолкнулись на потоки изменений MongoDB. Эта ссылка также демонстрирует некоторые фрагменты реализации для нескольких языков, таких как Python, Java, Nodejs и т. Д. Тем не менее, для Go нет фрагмента кода.

Мы используем Mgo в качестве драйвера, но не смогли найти явные заявления о потоках изменений.

Кто-нибудь имеет какие-либо идеи о том, как смотреть на Change Streams с помощью этого Mgo или любого другого драйвера Mongo для Go?

1 ответ

Решение

Популярный mgo Водитель (github.com/go-mgo/mgo) разработанная Густаво Нимейером, потемнела (не поддерживается). И он не поддерживает потоки изменений.

Сообщество поддерживает форк github.com/globalsign/mgo находится в гораздо лучшей форме, и уже добавлена ​​поддержка потоков изменений ( подробности см. здесь).

Чтобы посмотреть изменения в коллекции, просто используйте Collection.Watch() метод, который возвращает вам значение mgo.ChangeStream, Вот простой пример его использования:

coll := ... // Obtain collection

pipeline := []bson.M{}

changeStream := coll.Watch(pipeline, mgo.ChangeStreamOptions{})
var changeDoc bson.M
for changeStream.Next(&changeDoc) {
    fmt.Printf("Change: %v\n", changeDoc)
}

if err := changeStream.Close(); err != nil {
    return err
}

Также обратите внимание, что в стадии разработки находится официальный драйвер MongoDB Go, о котором было объявлено здесь: Учитывая влияние сообщества на введение официального драйвера MongoDB Go

В настоящее время он находится в альфа (!!) фазе, поэтому примите это во внимание. Это доступно здесь: github.com/mongodb/mongo-go-driver, Он также уже поддерживает потоки изменений, аналогично через Collection.Watch() метод (это другой mongo.Collection типа, это не имеет ничего общего с mgo.Collection). Возвращает mongo.Cursor который вы можете использовать так:

var coll mongo.Collection = ... // Obtain collection

ctx := context.Background()

var pipeline interface{} // set up pipeline

cur, err := coll.Watch(ctx, pipeline)
if err != nil {
    // Handle err
    return
}
defer cur.Close(ctx)

for cur.Next(ctx) {
    elem := bson.NewDocument()
    if err := cur.Decode(elem); err != nil {
        log.Fatal(err)
    }

    // do something with elem....
}

if err := cur.Err(); err != nil {
    log.Fatal(err)
}

В этом примере используется поддерживаемый MongoDB драйвер для Go с потоковым конвейером (фильтрация только документов, имеющих field1=1 и field2=false):

    ctx := context.TODO()
    clientOptions := options.Client().ApplyURI(mongoURI)
    client, err := mongo.Connect(ctx, clientOptions)
    if err != nil {
        log.Fatal(err)
    }
    err = client.Ping(ctx, nil)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Connected!")

    collection := client.Database("test").Collection("test")

    pipeline := mongo.Pipeline{bson.D{
        {"$match",
            bson.D{
                {"fullDocument.field1", 1},
                {"fullDocument.field2", false},
            },
        },
    }}
    streamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)

    stream, err := collection.Watch(ctx, pipeline, streamOptions)
    if err != nil {
        log.Fatal(err)
    }
    log.Print("waiting for changes")
    var changeDoc map[string]interface{}
    for stream.Next(ctx) {
        if e := stream.Decode(&changeDoc); e != nil {
            log.Printf("error decoding: %s", e)
        }
        log.Printf("change: %+v", changeDoc)
    }
Другие вопросы по тегам