Используйте Hazelcast IMap на процессоре Hazelcast Jet

Я только начинаю изучать Hazelcast Jet. Мой источник - UDP датаграммы. Я хочу обрабатывать его параллельно на некоторых узлах Jet и пересылать их по другим адресам по "домену". Я хочу использовать Hazelcast IMDG IMap с загрузчиком, чтобы получить "домен" по "исходному IP".

DAG dag = new DAG();        
Vertex source = dag.newVertex("datagram-source",
                UdpSocketP.supplier("0.0.0.0", 41813));
        source.localParallelism(1);

        Vertex mapper = dag.newVertex("map",
                map(new DomainMapper(instance.getMap("mysqlNas"))));

        Vertex sink = dag.newVertex("sink",
                Sinks.writeFile("logs"));
        sink.localParallelism(1);

Но когда я пытаюсь использовать IMap в DistributedFunction, я получаю исключение

Exception in thread "main" java.lang.IllegalArgumentException: "metaSupplier" must be serializable
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:185)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:101)
    at com.hazelcast.jet.Vertex.<init>(Vertex.java:78)
    at com.hazelcast.jet.DAG.newVertex(DAG.java:79)
    at org.eltex.softwlc.sorm.replicator.JetServer.main(JetServer.java:46)
Caused by: java.io.NotSerializableException: com.hazelcast.jet.stream.impl.MapDecorator

Код DomainMapper:

package org.eltex.softwlc.sorm.replicator;

import com.hazelcast.core.IMap;
import com.hazelcast.jet.function.DistributedFunction;

import java.io.Serializable;
import java.net.DatagramPacket;

/**
 * Created by mickey on 21.07.17.
 */
public class DomainMapper implements DistributedFunction<DatagramPacket, IpData>, Serializable {

    private final IMap<String, NasValue> map;

    public DomainMapper(IMap<String, NasValue> map) {
        this.map = map;
    }

    @Override
    public IpData apply(DatagramPacket datagramPacket) {
        final IpData d = new IpData(datagramPacket, datagramPacket.getAddress().getHostAddress());
        System.out.println(d);

        final NasValue nasValue = map.get(datagramPacket.getAddress().getHostAddress());
        if (nasValue!=null) {
            d.setDomain(nasValue.getDomain());
        }

        return d;
    }
}

В чем моя ошибка? Или Hazelcast Jet - неправильный выбор для моих целей.

1 ответ

Решение

Проблема в том, что вы пытаетесь сериализовать весь IMap внутри функции. Прямым решением было бы написать собственный процессор, который получает доступ к экземпляру Hazelcast Jet внутри его init() метод и ищет свой IMap от этого. поскольку init() код выполняется на целевом элементе, после всей десериализации это будет работать.

Тем не менее, на более общем уровне ваша цель выглядит как "обогащение данных". Мы хотим поддержать это в Jet с помощью операции "hash join", которая в настоящее время не является первоклассной; однако есть пример кода, демонстрирующий подход. Вы можете либо направить весь IMap содержимое вершины, которая превратит его в равнину HashMap и распространить на все обогащающие процессоры, или вы можете подготовить Hazelcast ReplicatedMap это будет использоваться непосредственно процессором обогащения.

Первый подход означает, что вы работаете против снимка IMap; во втором вы можете продолжить обновление ReplicatedMap как работа работает.

Лучше всего пойти и проверить образцы: HashMapEnrichment и ReplicatedMapEnrichment.

Другие вопросы по тегам