Сервисная шина Keda: все сообщения обрабатываются некоторыми модулями keda ScaledJob.
Я использую платформу Azure Kubernetes Service(AKS) и использовал KEDA «ScaledJob» для длительной работы. В этом случае для автоматического запуска заданий использовался триггер очереди служебной шины Azure. Теперь, пока я добавляю сообщения в служебную шину Azure, KEDA автоматически запускает задание и создает узел/модуль в соответствии с конфигурацией. но в этом случае все сообщения собираются и обрабатываются некоторыми модулями. ожидается, что каждый увеличенный модуль обрабатывает одно сообщение и завершает работу.
ниже мой файл yml
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{ .Chart.Name }}
spec:
jobTargetRef:
backoffLimit: 4
parallelism: 1
completions: 1
activeDeadlineSeconds: 300
template:
spec:
imagePullSecrets:
- name: {{ .Values.image.imagePullSecrets }}
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
volumes:
- name: azure
azureFile:
shareName: sharenameone
secretName: secret-sharenameone
readOnly: true
- name: one-storage
emptyDir: {}
- name: "logging-volume-file"
persistentVolumeClaim:
claimName: "azure-file-logging"
initContainers:
- name: test-java-init
image: {{ .Values.global.imageRegistryURI }}/{{ .Values.image.javaInitImage.name}}:{{ .Values.image.javaInitImage.tag }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
securityContext:
readOnlyRootFilesystem: true
resources:
requests:
cpu: 100m
memory: 300Mi
limits:
cpu: 200m
memory: 400Mi
volumeMounts:
- name: azure
mountPath: /mnt/azure
- name: one-storage
mountPath: /certs
containers:
- name: {{ .Chart.Name }}
image: {{ .Values.global.imageRegistryURI }}/tests/{{ .Chart.Name }}:{{ .Values.version }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
{{- include "chart.envVars" . | nindent 14 }}
- name: JAVA_OPTS
value: >-
{{ .Values.application.javaOpts }}
- name: application_name
value: "test_application"
- name: queueName
value: "test-queue-name"
- name: servicebusconnstrenv
valueFrom:
secretKeyRef:
name: secrets-service-bus
key: service_bus_conn_str
volumeMounts:
- name: cert-storage
mountPath: /certs
- name: "logging-volume-azure-file"
mountPath: "/mnt/logging"
resources:
{{- toYaml .Values.resources | nindent 14 }}
pollingInterval: 30
maxReplicaCount: 5
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 20
triggers:
- type: azure-servicebus
metadata:
queueName: "test-queue-name"
connectionFromEnv: servicebusconnstrenv
messageCount: "1"
и это мой прослушиватель функций Azure
@FunctionName("TestServiceBusTrigger")
public void TestServiceBusTriggerHandler(
@ServiceBusQueueTrigger(
name = "msg",
queueName = "%TEST_QUEUE_NAME%",
connection = "ServiceBusConnectionString")
final String inputMessage,
final ExecutionContext context) {
final java.util.logging.Logger contextLogger = context.getLogger();
System.setProperty("javax.net.ssl.trustStore", "/certs/cacerts");
try {
// all the processing goes here
} catch (Exception e) {
//Exception handling
}
}
какие конфигурации необходимо добавить, чтобы каждый масштабируемый модуль обрабатывал одно сообщение и завершал работу?
1 ответ
Функции Azure были разработаны не так, или даже не так, как следует использовать KEDA в целом. Было бы более идеально, если бы уже работающий контейнер обрабатывал как можно больше сообщений после подготовки.
При этом, если ваш сценарий по-прежнему требует этого, вы можете написать простой сценарий, который напрямую использует SDK служебной шины Azure для получения только одного сообщения, его обработки и завершения.