Flink дает мне "тоже может открыть файлы" на контрольной точке

Я выполняю одни и те же потоковые задания несколько раз, но с разными параметрами. Задача использует состояние ключа, чтобы просто вычислить разницу между текущим событием и последним полученным и отправить его в ту же тему Кафки (я знаю, что это не логично или распространено, но это не мое решение). После нескольких тестов все работает хорошо, пока через час я не получу следующие исключения:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_306d8342cb5b2ad8b53f1be57f65bee8_(1/3) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
... 5 more
Caused by: java.io.FileNotFoundException: /home/quantion/flink-checkpoints/PeriodDailyAvrgValuePeak/1cb1374a0ea7dc9d74f86a8de9be3bec/chk-1/3274bb7c-0352-4367-87bc-9f85939f00b3 (Too many open files)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:112)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:315)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:95)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more

Больше

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:765)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:633)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:420)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.<init>(Selector.java:129)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:156)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:160)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)
... 11 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:127)
... 14 more

Я проверил лидера темы правильно, как я видел в аналогичном вопросе, но все, кажется, хорошо.

Я уже пытаюсь увеличить количество файлов, которые система может обработать с помощью ulimit -n, но ошибка все еще остается.

Я также вручную удаляю несколько файлов контрольных точек, поскольку сохраняю их при отмене, но пока не оказал никакого влияния.

Мой код до сих пор:

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // set up checkpoint
    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStateBackend((StateBackend) new FsStateBackend("file:///home/quantion/flink-checkpoints/diferential-checkpoints"));
    env.enableCheckpointing(3600000);

    if (args.length != 3) throw new Exception("Needs some arguments:\n" +
                "file.jar <dev_type> <sensor_type> <new_label>");

    // get arguments
    Arguments arguments = new Arguments(args);
    String TYPE = arguments.getTYPE();
    String TARGET = arguments.getTARGET();
    String NEW_LABEL = arguments.getNEW_LABEL();

    // Propiedades Kafka
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", HOST+":9092");
    properties.setProperty("zookeeper.connect", HOST+":2181");
    properties.setProperty("group.id", TYPE); //TYPE
    Logger LOG = LoggerFactory.getLogger(DifferentialConsumption.class);

    DataStream<String> inputStream = env.addSource(new FlinkKafkaConsumer011<>(TOPIC, new SimpleStringSchema(), properties))
            .name("Read Kafka source --topic "+TOPIC);

    DataStream<JSONObject> jsonObjectDataStream = inputStream.map(x -> {
        try {
            return new JSONObject(x);
        } catch (Exception e) {
            LOG.error("An {} occurred.", "error", e);
            return new JSONObject();
        }
    }).name("Parse events to JSONObjects");

    DataStream<JSONObject> filterStream = jsonObjectDataStream.filter(x -> x.has("type") && x.get("type").equals(TYPE) && !x.has(NEW_LABEL))
            .name("Filter events by type: "+TYPE+" and don't have "+NEW_LABEL+" already");

    DataStream<JSONObject> saveSate = filterStream.keyBy(x -> x.getString("id")).flatMap(new Diferential(TARGET, NEW_LABEL))
            .name("KeyedState for differential");
    DataStream<String> streamToString = saveSate.map(JSONObject::toString)
            .name("Parse JSON to String");

    FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(HOST+":9092", TOPIC, new SimpleStringSchema());
    streamToString.addSink(myProducer).name("Write Kafka sink --topic "+TOPIC);
    //streamToString.print();

    // execute program
    env.execute("Get differential for "+TYPE);
}

1 ответ

Искаженные сообщения сохраняют все задания по восстановлению и повторному открытию файлов с контрольной точки, поэтому даже после перезапуска Flink ошибка все равно остается.

Для этого случая я добавляю JSONObject.has() проверить, существует ли аргумент перед его чтением, и задания могут восстановиться с контрольных точек после перезагрузки узла Flink.

Возможным альтернативным решением было перезапустить смещения в Kafka, но это может привести к потере некоторых событий.

Другие вопросы по тегам