Следите за потоками изменений MongoDB
Мы хотим, чтобы наше приложение Go прослушивало изменения данных в коллекции. Таким образом, прибегая к поиску решения, мы натолкнулись на потоки изменений MongoDB. Эта ссылка также демонстрирует некоторые фрагменты реализации для нескольких языков, таких как Python, Java, Nodejs и т. Д. Тем не менее, для Go нет фрагмента кода.
Мы используем Mgo в качестве драйвера, но не смогли найти явные заявления о потоках изменений.
Кто-нибудь имеет какие-либо идеи о том, как смотреть на Change Streams с помощью этого Mgo или любого другого драйвера Mongo для Go?
1 ответ
Популярный mgo
Водитель (github.com/go-mgo/mgo
) разработанная Густаво Нимейером, потемнела (не поддерживается). И он не поддерживает потоки изменений.
Сообщество поддерживает форк github.com/globalsign/mgo
находится в гораздо лучшей форме, и уже добавлена поддержка потоков изменений ( подробности см. здесь).
Чтобы посмотреть изменения в коллекции, просто используйте Collection.Watch()
метод, который возвращает вам значение mgo.ChangeStream
, Вот простой пример его использования:
coll := ... // Obtain collection
pipeline := []bson.M{}
changeStream := coll.Watch(pipeline, mgo.ChangeStreamOptions{})
var changeDoc bson.M
for changeStream.Next(&changeDoc) {
fmt.Printf("Change: %v\n", changeDoc)
}
if err := changeStream.Close(); err != nil {
return err
}
Также обратите внимание, что в стадии разработки находится официальный драйвер MongoDB Go, о котором было объявлено здесь: Учитывая влияние сообщества на введение официального драйвера MongoDB Go
В настоящее время он находится в альфа (!!) фазе, поэтому примите это во внимание. Это доступно здесь: github.com/mongodb/mongo-go-driver
, Он также уже поддерживает потоки изменений, аналогично через Collection.Watch()
метод (это другой mongo.Collection
типа, это не имеет ничего общего с mgo.Collection
). Возвращает mongo.Cursor
который вы можете использовать так:
var coll mongo.Collection = ... // Obtain collection
ctx := context.Background()
var pipeline interface{} // set up pipeline
cur, err := coll.Watch(ctx, pipeline)
if err != nil {
// Handle err
return
}
defer cur.Close(ctx)
for cur.Next(ctx) {
elem := bson.NewDocument()
if err := cur.Decode(elem); err != nil {
log.Fatal(err)
}
// do something with elem....
}
if err := cur.Err(); err != nil {
log.Fatal(err)
}
В этом примере используется поддерживаемый MongoDB драйвер для Go с потоковым конвейером (фильтрация только документов, имеющих field1=1 и field2=false):
ctx := context.TODO()
clientOptions := options.Client().ApplyURI(mongoURI)
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
log.Fatal(err)
}
err = client.Ping(ctx, nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected!")
collection := client.Database("test").Collection("test")
pipeline := mongo.Pipeline{bson.D{
{"$match",
bson.D{
{"fullDocument.field1", 1},
{"fullDocument.field2", false},
},
},
}}
streamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := collection.Watch(ctx, pipeline, streamOptions)
if err != nil {
log.Fatal(err)
}
log.Print("waiting for changes")
var changeDoc map[string]interface{}
for stream.Next(ctx) {
if e := stream.Decode(&changeDoc); e != nil {
log.Printf("error decoding: %s", e)
}
log.Printf("change: %+v", changeDoc)
}