Невозможно распечатать файл CSV с помощью Flink Table API

Я пытаюсь прочитать файл с 34 полями для печати на консоли с помощью NetBeans. Однако все, что я могу напечатать, это схема. Потому что опция печати отсутствует в этой конкретной версии Flink, используемой с csvreader.

Пожалуйста, посмотрите код и помогите мне понять, где я должен исправить. Я бы использовал CSVReader, встроенный API, но оказывается, что он не поддерживает более 22 полей и, следовательно, прибегает к использованию Table API. Также пытался использовать CsvTableSource версия 1.5.1 Flink, но не везет с синтаксисом. Как .field("%CPU", Types.FLOAT()) продолжал давать ошибку для типа float не распознанный символ. Моя главная цель - просто прочитать CSV-файл и отправить его в тему Kafka, но перед этим я хочу проверить, прочитан ли файл, но пока не повезло.

import org.apache.flink.table.api.Table;
import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.api.java.Slide;


public class CsvReader {
  public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",
            new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",
    "OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",
    "io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",
    "mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",
    "mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",
    "cpu_time_children_system", "container_nr_sleeping", "container_nr_running",
    "container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
            new TypeInformation<?>[] {
                    Types.FLOAT(),
                    Types.FLOAT(),
                    Types.FLOAT(),
                    Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT(),
                                            Types.FLOAT()
                            });// lenient

    tEnv.registerTableSource("container", csvTableSource);
    Table result = tEnv
        .scan("container");
        System.out.println(result);
                result.printSchema();


      }
    }
/*tEnv.toAppendStream(result, Row.class).print();         
result.writeToSink(null);print();
            env.execute();*/

Это выход

root
 |-- %CPU: Float
 |-- MEM: Float
 |-- VSZ: Float
 |-- RSS: Float
 |-- timestamp: Float
 |-- OOM_Score: Float
 |-- io_read_count: Float
 |-- io_write_count: Float
 |-- io_read_bytes: Float
 |-- io_write_bytes: Float
 |-- io_read_chars: Float
 |-- io_write_chars: Float
 |-- num_fds: Float
 |-- num_ctx_switches_voluntary: Float
 |-- num_ctx_switches_involuntary: Float
 |-- mem_rss: Float
 |-- mem_vms: Float
 |-- mem_shared: Float
 |-- mem_text: Float
 |-- mem_lib: Float
 |-- mem_data: Float
 |-- mem_dirty: Float
 |-- mem_uss: Float
 |-- mem_pss: Float
 |-- mem_swap: Float
 |-- num_threads: Float
 |-- cpu_time_user: Float
 |-- cpu_time_system: Float
 |-- cpu_time_children_user: Float
 |-- cpu_time_children_system: Float
 |-- container_nr_sleeping: Float
 |-- container_nr_running: Float
 |-- container_nr_stopped: Float
 |-- container_nr_uninterruptible: Float
 |-- container_nr_iowait: Float

И это еще одна версия кода, которая тоже не сработала

package wikiedits;

import static com.sun.xml.internal.fastinfoset.alphabet.BuiltInRestrictedAlphabets.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableEnvironment;
public class Csv {
  public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
//TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);

    CsvTableSource csvTableSource = CsvTableSource
    .builder()
    .path("home/merlin/Experiments/input_container/container_data1.csv")
    .field("%CPU", Types.FLOAT)
    .field("MEM", Types.FLOAT)
    .field("VSZ", Types.FLOAT)
    .field("RSS", Types.FLOAT)
    .field("timestamp", Types.FLOAT)
    .field("OOM_Score", Types.FLOAT)
    .field("io_read_count", Types.FLOAT)
    .field("io_write_count", Types.FLOAT)
    .field("io_read_bytes", Types.FLOAT)
    .field("io_write_bytes", Types.FLOAT)
    .field("io_read_chars", Types.FLOAT)
    .field("io_write_chars", Types.FLOAT)
    .field("num_fds", Types.FLOAT)
    .field("num_ctx_switches_voluntary", Types.FLOAT)
    .field("num_ctx_switches_involuntary", Types.FLOAT)
    .field("mem_rss", Types.FLOAT)
    .field("mem_vms", Types.FLOAT)
    .field("mem_shared", Types.FLOAT)
    .field("mem_text", Types.FLOAT)
    .field("mem_lib", Types.FLOAT)
    .field("mem_data", Types.FLOAT)
    .field("mem_dirty", Types.FLOAT)
    .field("mem_uss", Types.FLOAT)
    .field("mem_pss", Types.FLOAT)
    .field("mem_swap", Types.FLOAT)
    .field("num_threads", Types.FLOAT)
    .field("cpu_time_user", Types.FLOAT)
    .field("cpu_time_system", Types.FLOAT)
    .field("cpu_time_children_user", Types.FLOAT)
    .field("cpu_time_children_system", Types.FLOAT)
    .field("container_nr_sleeping", Types.FLOAT)
    .field("container_nr_running", Types.FLOAT)
    .field("container_nr_stopped", Types.FLOAT)
    .field("container_nr_uninterruptible", Types.FLOAT)
    .field("container_nr_iowait", Types.FLOAT)
    .fieldDelimiter(",")
    .lineDelimiter("\n")
    .ignoreFirstLine()
    .ignoreParseErrors()
    .commentPrefix("%")
    .build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
      stream.print();
             // env.execute();

  }
}

Новое редактирование. Если бы мне пришлось передать его в тему Кафки, а затем в вызов функции? Вот что я попробовал:

DataStreamSink<Row> stream = addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties));
//DataStreamSink<Row> stream = tEnv.toAppendStream(table, Row.class).print();
 stream.map(new MapFunction<String, String>() {
  private static final long serialVersionUID = -6867736771747690202L;
    public String map(String value) throws Exception {
    SendToRestAPI sendrest= new SendToRestAPI(value);
    String String1= sendrest.converttoJson();  

    return "Stream Value: " + String1;
  }
})
.addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties)); /*.print();*/

env.execute();
  }
 }

Строка stream.map выдает ошибку:

Cannot find symbol:method Map

2 ответа

Решение

Этот код:

package example.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;

public class TestFlink {
    public static void main(String[] args) {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);

        CsvTableSource csvTableSource = CsvTableSource
                .builder()
                .path("container_data.csv")
                .field("%CPU", Types.FLOAT)
                .field("MEM", Types.FLOAT)
                .field("VSZ", Types.FLOAT)
                .field("RSS", Types.FLOAT)
                .field("timestamp", Types.FLOAT)
                .field("OOM_Score", Types.FLOAT)
                .field("io_read_count", Types.FLOAT)
                .field("io_write_count", Types.FLOAT)
                .field("io_read_bytes", Types.FLOAT)
                .field("io_write_bytes", Types.FLOAT)
                .field("io_read_chars", Types.FLOAT)
                .field("io_write_chars", Types.FLOAT)
                .field("num_fds", Types.FLOAT)
                .field("num_ctx_switches_voluntary", Types.FLOAT)
                .field("num_ctx_switches_involuntary", Types.FLOAT)
                .field("mem_rss", Types.FLOAT)
                .field("mem_vms", Types.FLOAT)
                .field("mem_shared", Types.FLOAT)
                .field("mem_text", Types.FLOAT)
                .field("mem_lib", Types.FLOAT)
                .field("mem_data", Types.FLOAT)
                .field("mem_dirty", Types.FLOAT)
                .field("mem_uss", Types.FLOAT)
                .field("mem_pss", Types.FLOAT)
                .field("mem_swap", Types.FLOAT)
                .field("num_threads", Types.FLOAT)
                .field("cpu_time_user", Types.FLOAT)
                .field("cpu_time_system", Types.FLOAT)
                .field("cpu_time_children_user", Types.FLOAT)
                .field("cpu_time_children_system", Types.FLOAT)
                .field("container_nr_sleeping", Types.FLOAT)
                .field("container_nr_running", Types.FLOAT)
                .field("container_nr_stopped", Types.FLOAT)
                .field("container_nr_uninterruptible", Types.FLOAT)
                .field("container_nr_iowait", Types.FLOAT)
                .fieldDelimiter(",")
                .lineDelimiter("\n")
                .ignoreFirstLine()
                .ignoreParseErrors()
                .commentPrefix("%")
                .build();
        // name your table source
        tEnv.registerTableSource("container", csvTableSource);
        Table table = tEnv.scan("container");
        DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
        // define the sink as common print on console here
        stream.print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

С этими библиотеками (некоторые, вероятно, избыточны):

compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-runtime_2.11', version: '1.5.1'

Работает по крайней мере. Я не уверен, дает ли он вывод, который вам нужен, но это почти то, что вы добавили в свое последнее редактирование, но работает в IDE. Это помогает?

Если ваш разделитель все еще является пробелом, не забудьте изменить .fieldDelimiter(",")

Вы должны конвертировать Table в DataStream распечатать это. Самый простой способ сделать это - преобразовать его в DataStream<Row> следующее:

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
// print the stream & execute the program
stream.print();
env.execute();

Смотрите документацию для более подробной информации.

Другие вопросы по тегам