Используйте Hazelcast IMap на процессоре Hazelcast Jet
Я только начинаю изучать Hazelcast Jet. Мой источник - UDP датаграммы. Я хочу обрабатывать его параллельно на некоторых узлах Jet и пересылать их по другим адресам по "домену". Я хочу использовать Hazelcast IMDG IMap с загрузчиком, чтобы получить "домен" по "исходному IP".
DAG dag = new DAG();
Vertex source = dag.newVertex("datagram-source",
UdpSocketP.supplier("0.0.0.0", 41813));
source.localParallelism(1);
Vertex mapper = dag.newVertex("map",
map(new DomainMapper(instance.getMap("mysqlNas"))));
Vertex sink = dag.newVertex("sink",
Sinks.writeFile("logs"));
sink.localParallelism(1);
Но когда я пытаюсь использовать IMap в DistributedFunction, я получаю исключение
Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator
Код DomainMapper:
package org.eltex.softwlc.sorm.replicator;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;
import java.io.Serializable;
import java.net.DatagramPacket;
/**
* Created by mickey on 21.07.17.
*/
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {
private final IMap<String, NasValue> map;
public DomainMapper(IMap<String, NasValue> map) {
this.map = map;
}
@Override
public IpData apply(DatagramPacket datagramPacket) {
final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
System.out.println(d);
final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
if (nasValue!=null) {
d.setDomain(nasValue.getDomain());
}
return d;
}
}
В чем моя ошибка? Или Hazelcast Jet - неправильный выбор для моих целей.
1 ответ
Проблема в том, что вы пытаетесь сериализовать весь IMap
внутри функции. Прямым решением было бы написать собственный процессор, который получает доступ к экземпляру Hazelcast Jet внутри его init()
метод и ищет свой IMap от этого. поскольку init()
код выполняется на целевом элементе, после всей десериализации это будет работать.
Тем не менее, на более общем уровне ваша цель выглядит как "обогащение данных". Мы хотим поддержать это в Jet с помощью операции "hash join", которая в настоящее время не является первоклассной; однако есть пример кода, демонстрирующий подход. Вы можете либо направить весь IMap
содержимое вершины, которая превратит его в равнину HashMap
и распространить на все обогащающие процессоры, или вы можете подготовить Hazelcast ReplicatedMap
это будет использоваться непосредственно процессором обогащения.
Первый подход означает, что вы работаете против снимка IMap
; во втором вы можете продолжить обновление ReplicatedMap
как работа работает.
Лучше всего пойти и проверить образцы: HashMapEnrichment и ReplicatedMapEnrichment.