Обещания Javascript /Q - Я делаю это правильно?
Я пытаюсь использовать Q в NodeJS для создания некоторых оберток amqplib. Обертки работают правильно (пока), но я чувствую, что мое использование Q... неправильно.
Во-первых, есть метод инициализации:
private static Startup(): void {
var sub_YoMsgHandler = (msg: any) => {
console.log(util.format('Received Yo: %s', msg.content.toString()));
var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());
}
var sub_TelemetryMsgHandler = (msg: any) => {
var bmy: dto.interfaces.IAsyncProcessorCommand = JSON.parse(msg.content.toString());
console.log(util.format('Received Telemetry: %s', msg.content.toString()));
}
Play.AMQ.Open.then((connection) => {
Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
Play.AMQ.ConfirmChannel.then((confirmChannel) => {
confirmChannel.on('error', Play.handleChannelError);
Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
Play.ReceiveTelemetry(sub_TelemetryMsgHandler);
Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
//track consumer tags in CreateConsumer?
Play.AMQ.Subscribers.push(consumerTag);
});
});
});
}
Я упоминал, что использую TypeScript? Этот метод соединяет, создает канал, создает две очереди отправки / получения, создает двух подписчиков, а затем сохраняет обещания соединения, канала и очереди в объекте. Тогда вот один способ создания подписчика (потребителя):
private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
var qid = type + '_' + name;
return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
return confirmChannel.consume(qid, (msg) => {
handler(msg);
confirmChannel.ack(msg);
});
});
});
});
},
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
}
Наконец, вот мой метод публикации:
private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
var ackDeferred = Q.defer<boolean>();
var handleChannelConfirm = (err, ok): void => {
if (err !== null) {
console.warn('Message nacked!');
ackDeferred.resolve(false);
}
else {
console.log('Message acked');
ackDeferred.resolve(true);
}
}
// '#' instead of ''?
Play.AMQ.ConfirmChannel.then((confirmChannel) => {
confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
});
},
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
return ackDeferred.promise;
}
Как я уже сказал, все работают, но мне кажется, что я не использую обещания правильным или рекомендованным способом.
Здесь есть какие-то вопиющие ошибки - или я делаю это правильно? В частности, я думаю, мне любопытно мое цепочка и обработка ошибок (я думаю, что обработка ошибок, скорее всего, будет неправильной). Бонусные баллы за то, что я показал правильный путь в этом методе публикации для получения обработчика стиля обратного вызова и Promise-ifying...
2 ответа
Если Q следует спецификации обещания, он должен работать так
return Play.AMQ.ConfirmChannel
.then(confirmChannel => confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }))
.then(okQueueReply => confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }))
.then(okExchangeReply => confirmChannel.bindQueue(qid, type, ''))
.then(okBindReply => confirmChannel.consume(qid))
.then(msg => {
handler(msg);
confirmChannel.ack(msg);
});
И тогда имеет смысл возвращать что-то из последнего.
Обратный звонок в then
Метод возвращает новое обещание, поэтому вы можете связать их без вложенных обратных вызовов.
Подводя итог вашей идеи,
/*
Open() // connecting...
.then confirmChannel() //create a channel
.then function () {
commandQueue(); //create send queue?
TelemetryQueue(); // create recv queue?
}
.then createConsumer(); //create two subsribers?
.catch(function (err) {
The most outter error handler.
})
.end() terminate the promise chain
*/
Я пойду через функции по одной
Play.AMQ.Open.then((connection) => {
Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
//since ConfirmChannel() is a promise, it means channel is working I assume if you can run this callback
//confirmChannel.on('error', Play.handleChannelError);
//No idea following codes are promise or not...
//Assuming that following 3 lines won't be fail
Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
Play.ReceiveTelemetry(sub_TelemetryMsgHandler);
//Chainning promise....
//Btw... I am curious why you call a `prviate` method directly
return Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
//track consumer tags in CreateConsumer?
Play.AMQ.Subscribers.push(consumerTag);
});
});
})
.end(); //Catch any un-handled errors and terminate the Q chain (EG error from confirmChannel())
private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
var qid = type + '_' + name;
//I suggest you put `confirmChannel` as one of the input arguments since this method will only be called after open() and confirmChannel()
return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
return confirmChannel.consume(qid, (msg) => {
handler(msg);
confirmChannel.ack(msg);
});
});
});
});
}
//I am not familiarize syntax on TypeScript, I think below error handler is an input argument of Play.AMQ.ConfirmChannel.then()
//If so, this error handler can handler error from ConfirmChannel() only.
,
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
}
/*
I am not sure the intent (or command flow?) of your below function.
1. If ConfirmChannel() fail, I think your program will crash since no one handling below error
throw new Error('create consumer issue: ' + failReason);
2. No idea to figure out the relationship between 2 promises ackDeferred and Play.AMQ.ConfirmChannel
**/
private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
var ackDeferred = Q.defer<boolean>();
var handleChannelConfirm = (err, ok): void => {
if (err !== null) {
console.warn('Message nacked!');
ackDeferred.resolve(false);
}
else {
console.log('Message acked');
ackDeferred.resolve(true);
}
}
// '#' instead of ''?
Play.AMQ.ConfirmChannel.then((confirmChannel) => {
confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
});
},
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
return ackDeferred.promise;
}
Надеюсь, поможет.
Кстати.... Как включить подсветку синтаксиса:D?