Hazelcast jet 0.6.1 Определение конвейера и DAG

У меня есть пример кода для построения конвейера.

    private Pipeline buildPipeline() {

    logger.debug("AbstractAuditLogProcessor.buildPipeline  method start");
    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getPlatformClientConfig(), START_FROM_OLDEST))
    .addTimestamps((v) ->  getTimeStamp(v), 3000)
    .peek()
    .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(getSlidingWindowLengthInMillies(), getSlidingStepInMillies()))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    //.<Map.Entry<String, Long>>customTransform("test2", ()-> this)
    //.<Offer>customTransform("Offer_Recommendations", ()-> this)
    .<Map.Entry<String, Offer>>customTransform("Offer_Recommendations", ()-> this)
    //.drainTo(Sinks.remoteList("cache_OfferRecommendations", getPlatformClientConfig()));
    .drainTo(Sinks.remoteMap("cache_OfferRecommendations", getPlatformClientConfig()));
    logger.debug("AbstractAuditLogProcessor.buildPipeline  method end");
    return p;
}

Этот код печатает следующую информацию DAG

dag
.vertex("remoteMapJournalSource(cache_AuditLog)").localParallelism(1)
.vertex("sliding-window-step1").localParallelism(4)
.vertex("sliding-window-step2").localParallelism(4)
.vertex("map").localParallelism(4)
.vertex("Offer_Recommendations").localParallelism(4)
.vertex("remoteMapSink(cache_OfferRecommendations)").localParallelism(1)
.edge(between("remoteMapJournalSource(cache_AuditLog)", "sliding-window-step1").partitioned(?))
.edge(between("sliding-window-step1", "sliding-window-step2").partitioned(?).distributed())
.edge(between("sliding-window-step2", "map"))
.edge(between("map", "Offer_Recommendations"))
.edge(between("Offer_Recommendations", "remoteMapSink(cache_OfferRecommendations)"))

Информация DAG имеет дополнительные детали / вызовы методов, такие как partitioned(), распределенный ()

Распределяет ли это записи на основе ключа? Кроме того, как обеспечивает использование hazelcast jet, записи не перемещаются в другие разделы.

0 ответов

Другие вопросы по тегам