Обещания Javascript /Q - Я делаю это правильно?

Я пытаюсь использовать Q в NodeJS для создания некоторых оберток amqplib. Обертки работают правильно (пока), но я чувствую, что мое использование Q... неправильно.

Во-первых, есть метод инициализации:

private static Startup(): void {
    var sub_YoMsgHandler = (msg: any) => {
        console.log(util.format('Received Yo: %s', msg.content.toString()));
        var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());
    }

    var sub_TelemetryMsgHandler = (msg: any) => {
        var bmy: dto.interfaces.IAsyncProcessorCommand = JSON.parse(msg.content.toString());
        console.log(util.format('Received Telemetry: %s', msg.content.toString()));
    }

    Play.AMQ.Open.then((connection) => {
        Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
        Play.AMQ.ConfirmChannel.then((confirmChannel) => {
            confirmChannel.on('error', Play.handleChannelError);

            Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
            Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
            Play.ReceiveTelemetry(sub_TelemetryMsgHandler);

            Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
                //track consumer tags in CreateConsumer?
                Play.AMQ.Subscribers.push(consumerTag);
            });
        });
    });
}

Я упоминал, что использую TypeScript? Этот метод соединяет, создает канал, создает две очереди отправки / получения, создает двух подписчиков, а затем сохраняет обещания соединения, канала и очереди в объекте. Тогда вот один способ создания подписчика (потребителя):

private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
    var qid = type + '_' + name;

    return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
            return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
                    return confirmChannel.consume(qid, (msg) => {
                        handler(msg);
                        confirmChannel.ack(msg);
                    });
                });
            });
        });
    },
    (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
    });
}

Наконец, вот мой метод публикации:

 private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
    var ackDeferred = Q.defer<boolean>();

    var handleChannelConfirm = (err, ok): void => {
        if (err !== null) {
            console.warn('Message nacked!');
            ackDeferred.resolve(false);
        }
        else {
            console.log('Message acked');
            ackDeferred.resolve(true);
        }
    }

    // '#' instead of ''?
    Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
            confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
        });
    },
    (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
    });

    return ackDeferred.promise;
}

Как я уже сказал, все работают, но мне кажется, что я не использую обещания правильным или рекомендованным способом.

Здесь есть какие-то вопиющие ошибки - или я делаю это правильно? В частности, я думаю, мне любопытно мое цепочка и обработка ошибок (я думаю, что обработка ошибок, скорее всего, будет неправильной). Бонусные баллы за то, что я показал правильный путь в этом методе публикации для получения обработчика стиля обратного вызова и Promise-ifying...

2 ответа

Решение

Если Q следует спецификации обещания, он должен работать так

return Play.AMQ.ConfirmChannel
.then(confirmChannel => confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }))
.then(okQueueReply => confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }))
.then(okExchangeReply => confirmChannel.bindQueue(qid, type, ''))
.then(okBindReply => confirmChannel.consume(qid))
.then(msg => {
    handler(msg);
    confirmChannel.ack(msg);
});

И тогда имеет смысл возвращать что-то из последнего.

Обратный звонок в then Метод возвращает новое обещание, поэтому вы можете связать их без вложенных обратных вызовов.

Подводя итог вашей идеи,

/*
Open() // connecting...
    .then confirmChannel()  //create a channel
        .then function () {
            commandQueue(); //create send queue?
            TelemetryQueue(); // create recv queue?
        }
            .then createConsumer(); //create two subsribers?


.catch(function (err) {
    The most outter error handler.
})
.end() terminate the promise chain
*/

Я пойду через функции по одной

Play.AMQ.Open.then((connection) => {

        Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
        return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        //since ConfirmChannel() is a promise, it means channel is working I assume if you can run this callback
        //confirmChannel.on('error', Play.handleChannelError); 

            //No idea following codes are promise or not...
        //Assuming that following 3 lines won't be fail
                Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
                Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
                Play.ReceiveTelemetry(sub_TelemetryMsgHandler);

        //Chainning promise....
        //Btw... I am curious why you call a `prviate` method directly
                return Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
                    //track consumer tags in CreateConsumer?
                    Play.AMQ.Subscribers.push(consumerTag);
                });
        });

})
.end(); //Catch any un-handled errors and terminate the Q chain (EG error from confirmChannel())



private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
    var qid = type + '_' + name;

    //I suggest you put `confirmChannel` as one of the input arguments since this method will only be called after open() and confirmChannel()
    return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
            return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
                    return confirmChannel.consume(qid, (msg) => {
                        handler(msg);
                        confirmChannel.ack(msg);
                    });
                });
            });
        });
    }
    //I am not familiarize syntax on TypeScript, I think below error handler is an input argument of Play.AMQ.ConfirmChannel.then()
    //If so, this error handler can handler error from ConfirmChannel() only.
    ,
    (failReason) => {
    throw new Error('create consumer issue: ' + failReason);
    });

}



/*
I am not sure the intent (or command flow?) of your below function.

1. If ConfirmChannel() fail, I think your program will crash since no one handling below error
    throw new Error('create consumer issue: ' + failReason);

2. No idea to figure out the relationship between 2 promises ackDeferred and Play.AMQ.ConfirmChannel
**/
private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
    var ackDeferred = Q.defer<boolean>();

    var handleChannelConfirm = (err, ok): void => {
    if (err !== null) {
        console.warn('Message nacked!');
        ackDeferred.resolve(false);
    }
    else {
        console.log('Message acked');
        ackDeferred.resolve(true);
    }
    }

    // '#' instead of ''?
    Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
        confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
    });
    },
    (failReason) => {
    throw new Error('create consumer issue: ' + failReason);
    });

    return ackDeferred.promise;
}

Надеюсь, поможет.

Кстати.... Как включить подсветку синтаксиса:D?

Другие вопросы по тегам