Hazelcast Jet 0.6.1 - Ошибка компиляции с помощью Pipeline customTransform API

Я получаю следующую ошибку компиляции с конвейером customTransform API.

Вот пример кода для построения конвейера:

private Pipeline buildPipeline2() {
    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getClientConfig(), START_FROM_OLDEST))
          .addTimestamps((v) ->  getTimeStamp(v), 3000)
          .peek()
          .groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
    .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
    //.aggregate(counting(),(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), formatKey(key), result))
    .aggregate(counting())
    .map((v)-> getMapKey(v))
    .customTransform("test2", ()-> this)
    .drainTo(Sinks.map("Test"));
    //.drainTo(Sinks.files("c:\\data\\op.txt"));
    return p;
  }

Вот пример кода для tryProcess() метод:

protected boolean tryProcess(int ordinal, Object item) {
    JetEvent jetEvent = (JetEvent)item;
    Object obj = jetEvent.payload();
    tryEmit(ordinal,item);
    return true;
}

Вот ошибка компиляции.

incompatible types: inferred type does not conform to upper bound(s)
[ERROR] inferred: java.lang.Object
[ERROR] upper bound(s): java.util.Map.Entry

Это компилируется и хорошо работает со следующим кодом.

 .customTransform("test2", ()-> this)
 .drainTo(Sinks.files("c:\\data\\op.txt"));

Тем не менее, дает ошибку компиляции с помощью следующего кода.

.customTransform("test2", ()-> this)
.drainTo(Sinks.map("Test"));

Не могли бы вы помочь мне решить эту проблему?

1 ответ

customTransform небезопасен Если параметр типа не может быть выведен, он оценивается как Object, Тем не менее Sinks.map требует Map.Entry<K, V>, Чтобы решить эту проблему, добавьте подсказку типа к customTransform метод:

    .<Map.Entry<YourKeyType, YourValueType>customTransform("test2", ()-> this)
    .drainTo(Sinks.map("Test"));

Имейте в виду, что если ваш пользовательский процессор на самом деле не возвращает Map.Entry, он потерпит неудачу во время выполнения.

Sinks.files работает, потому что требуется Object,

Другие вопросы по тегам