Пользовательский авторизатор AWS IoT Core
Я пытаюсь использовать настраиваемый авторизатор AWS IoT Core, как показано здесь (https://docs.aws.amazon.com/iot/latest/developerguide/config-custom-auth.html). Я разработал лямбда-выражение и смог опубликовать его с помощью конечной точки HTTP (https://docs.aws.amazon.com/iot/latest/apireference/API_iotdata_Publish.html), а также могу вызывать его с помощью интерфейса командной строки AWS, запустив
aws iot test-invoke-authorizer --authorizer-name <name> --mqtt-context "username=***,password=***,clientId=***"
. Однако, когда я пытаюсь использовать любой другой клиент, это не работает (я использовал первые клиенты mosquito, MQTT Explorer и paho). С ними у меня только тайм-аут на стороне клиента, а на стороне сервера ничего. Я также пробую Java AWS IoT Core SDK, где я получаю сбой согласования TLS в
boolean sessionPresent = connected.get();
при подключении. В качестве конечной точки я использую
IoT:Data-ATS
. В документации есть сбивающая с толку строка (https://docs.aws.amazon.com/iot/latest/developerguide/protocols.html), которая показывает в таблице, что пользовательская аутентификация с MQTT должна использоваться в порту 443 с сноска. В сноске говорится, что что-то вроде пользовательской аутентификации в порту 443 не работает. Что не имеет смысла.
Есть идея или помощь?
Mosquitto_sub отрезан:
mosquitto_sub -d -h **** -p 443 -u username?x-amz-customauthorizer-name=*** -P test -t test --cafile /etc/ssl/certs/Amazon_Root_CA_1.pem -i ***
Пахо отрезал:
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.nio.charset.StandardCharsets;
public class Paho implements MqttCallback {
public static void execute(){
try {
MqttClient client = new MqttClient("ssl://***:443","***",new MemoryPersistence());
client.connect();
client.setCallback(new Paho());
while (true){
client.publish("test","test".getBytes(StandardCharsets.UTF_8),0, false);
Thread.sleep(500);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
Клиент AWS отрезан:
public class AWS implements MqttClientConnectionEvents {
public static void execute(){
try {
try(EventLoopGroup eventLoopGroup = new EventLoopGroup(1);
HostResolver resolver = new HostResolver(eventLoopGroup);
ClientBootstrap clientBootstrap = new ClientBootstrap(eventLoopGroup, resolver);
AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newDefaultBuilder()) {
builder.withBootstrap(clientBootstrap)
.withConnectionEventCallbacks(new AWS())
.withClientId("****")
.withEndpoint("***")
.withCleanSession(true);
try(MqttClientConnection connection = builder.build()) {
CompletableFuture<Boolean> connected = connection.connect();
try {
boolean sessionPresent = connected.get();
System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!");
} catch (Exception ex) {
ex.printStackTrace();
throw new RuntimeException("Exception occurred during connect", ex);
}
CountDownLatch countDownLatch = new CountDownLatch(10);
CompletableFuture<Integer> subscribed = connection.subscribe("test", QualityOfService.AT_LEAST_ONCE, (message) -> {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("MESSAGE: " + payload);
countDownLatch.countDown();
});
subscribed.get();
int count = 0;
while (count++ < 10) {
CompletableFuture<Integer> published = connection.publish(new MqttMessage("test", "test".getBytes(), QualityOfService.AT_LEAST_ONCE, false));
published.get();
Thread.sleep(1000);
}
countDownLatch.await();
CompletableFuture<Void> disconnected = connection.disconnect();
disconnected.get();
}
} catch (Exception ex) {
System.out.println("Exception encountered: " + ex.toString());
}
System.out.println("Complete!");
}
@Override
public void onConnectionInterrupted(int errorCode) {
}
@Override
public void onConnectionResumed(boolean sessionPresent) {
}
}
1 ответ
У меня была аналогичная проблема с использованием версии paho для Python - я наконец исправил ее, подключившись к порту 443 вместо порта 8883 - и добавивmqtt
к протоколам ALPN. Похоже, это может не поддерживаться в библиотеке Java.
Он был добавлен в библиотеку C , но пока я пишу это, похоже, он не поддерживается в версии для Java .
Вот соответствующий код Python, который я использовал, чтобы заставить его работать:
import paho.mqtt.client as paho
client = paho.Client(client_id)
client.username_pw_set(
f"{username}?x-amz-customauthorizer-name={authorizer}",
password
)
ssl_context = ssl.create_default_context()
ssl_context.set_alpn_protocols(["mqtt"])
ssl_context.load_verify_locations("AmazonRootCA1.pem")
client.tls_set_context(ssl_context)
client.connect(host, 443)