Сервер Mosca не показывает полезную нагрузку с CLI MQTT
Я пытаюсь использовать Mosca, чтобы показать полезную нагрузку в сообщениях сервера, но сервер не показывает полезную нагрузку в опубликованном событии.
Код выглядит следующим образом:
server.js:
'use strict'
const debug = require('debug')('project:mqtt')
const mosca = require('mosca')
const redis = require('redis')
const chalk = require('chalk')
const db = require('iot-db')
const { parsePayload } = require('./utils')
const backend = {
type: 'redis',
redis,
return_buffers: true
}
const settings = {
port: 1883,
backend
}
const config = {
database: process.env.DB_NAME || 'iot',
username: process.env.DB_USER || 'user',
password: process.env.DB_PASS || 'admind',
host: process.env.DB_HOST || 'localhost',
dialect: 'postgres',
logging: s => debug(s),
}
const server = new mosca.Server(settings)
const clients = new Map()
let Agent, Metric
server.on('clientConnected', client => {
debug(`Client Connected: ${client.id}`)
clients.set(client.id, null)
})
server.on('clientDisconnected', client => {
debug(`Client Disconnected: ${client.id}`)
})
server.on('published', async (packet, client) => {
debug(`Received: ${packet.topic}`)
switch (packet.topic) {
case 'agent/connected':
case 'agent/disconnected':
debug(`Payload: ${packet.payload}`)
break
case 'agent/message':
debug(`Payload: ${packet.payload}`)
const payload = parsePayload(packet.payload)
if (payload) {
payload.agent.connected = true
let agent
try {
agent = await Agent.createOrUpdate(payload.agent)
} catch (e) {
return handleError(e)
}
debug(`Agent ${agent.uuid} saved`)
// Notify Agent is Connected
if (!clients.get(client.id)) {
clients.set(client.id, agent)
server.publish({
topic: 'agent/connected',
payload: JSON.stringify({
agent: {
uuid: agent.uuid,
name: agent.name,
hostname: agent.hostname,
pid: agent.pid,
connected: agent.connected
}
})
})
}
// Store Metrics
for (let metric of payload.metrics) {
let m
try {
m = await Metric.create(agent.uuid, metric)
} catch (e) {
return handleError(e)
}
debug(`Metric ${m.id} saved on agent ${agent.uuid}`)
}
}
break
}
})
server.on('ready', async () => {
const services = await db(config).catch(handleFatalError)
Agent = services.Agent
Metric = services.Metric
console.log(`${chalk.green('[platziverse-mqtt]')} server is running`)
})
server.on('error', handleFatalError)
function handleFatalError (err) {
console.error(`${chalk.red('[fatal error]')} ${err.message}`)
console.error(err.stack)
process.exit(1)
}
function handleError (err) {
console.error(`${chalk.red('[error]')} ${err.message}`)
console.error(err.stack)
}
process.on('uncaughtException', handleFatalError)
process.on('unhandledRejection', handleFatalError)
utils.js: преобразование полезной нагрузки в объект JavaScript
'use strict'
function parsePayload (payload) {
if (payload instanceof Buffer) {
payload = payload.toString('utf8')
}
try {
payload = JSON.parse(payload)
} catch (e) {
payload = {}
}
return payload
}
module.exports = {
parsePayload
}
MQTT CLI
$mqtt pub -t 'agent/message' -m 'hello'
Разорвать вывод:
опубликованное событие без оператора switch
'use strict'
const debug = require('debug')('project:mqtt')
const mosca = require('mosca')
const redis = require('redis')
const chalk = require('chalk')
const backend = {
type: 'redis',
redis,
return_buffers: true
}
const settings = {
port: 1883,
backend
}
const server = new mosca.Server(settings)
server.on('clientConnected', client => {
debug(`Client Connected: ${client.id}`)
})
server.on('clientDisconnected', client => {
debug(`Client Disconnected: ${client.id}`)
})
server.on('published', (packet, client) => {
debug(`Received: ${packet.topic}`)
debug(`Payload: ${packet.payload}`)
})
server.on('ready', () => {
console.log(`${chalk.green('[platziverse-mqtt]')} server is running`)
})
server.on('error', handleFatalError)
function handleFatalError () {
console.error(`${chalk.red('[fatal error]')} ${err.message}`)
console.error(err.stack)
process.exit(1)
}
process.on('uncaughtException', handleFatalError)
process.on('unhandledRejection', handleFatalError)
вывод сервера:
Любая идея, чтобы решить эту проблему, спасибо заранее