Невозможно распечатать файл CSV с помощью Flink Table API
Я пытаюсь прочитать файл с 34 полями для печати на консоли с помощью NetBeans. Однако все, что я могу напечатать, это схема. Потому что опция печати отсутствует в этой конкретной версии Flink, используемой с csvreader.
Пожалуйста, посмотрите код и помогите мне понять, где я должен исправить. Я бы использовал CSVReader
, встроенный API, но оказывается, что он не поддерживает более 22 полей и, следовательно, прибегает к использованию Table API. Также пытался использовать CsvTableSource
версия 1.5.1 Flink, но не везет с синтаксисом. Как .field("%CPU", Types.FLOAT())
продолжал давать ошибку для типа float не распознанный символ. Моя главная цель - просто прочитать CSV-файл и отправить его в тему Kafka, но перед этим я хочу проверить, прочитан ли файл, но пока не повезло.
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.api.java.Slide;
public class CsvReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",
new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",
"OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",
"io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",
"mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",
"mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",
"cpu_time_children_system", "container_nr_sleeping", "container_nr_running",
"container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
new TypeInformation<?>[] {
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT()
});// lenient
tEnv.registerTableSource("container", csvTableSource);
Table result = tEnv
.scan("container");
System.out.println(result);
result.printSchema();
}
}
/*tEnv.toAppendStream(result, Row.class).print();
result.writeToSink(null);print();
env.execute();*/
Это выход
root
|-- %CPU: Float
|-- MEM: Float
|-- VSZ: Float
|-- RSS: Float
|-- timestamp: Float
|-- OOM_Score: Float
|-- io_read_count: Float
|-- io_write_count: Float
|-- io_read_bytes: Float
|-- io_write_bytes: Float
|-- io_read_chars: Float
|-- io_write_chars: Float
|-- num_fds: Float
|-- num_ctx_switches_voluntary: Float
|-- num_ctx_switches_involuntary: Float
|-- mem_rss: Float
|-- mem_vms: Float
|-- mem_shared: Float
|-- mem_text: Float
|-- mem_lib: Float
|-- mem_data: Float
|-- mem_dirty: Float
|-- mem_uss: Float
|-- mem_pss: Float
|-- mem_swap: Float
|-- num_threads: Float
|-- cpu_time_user: Float
|-- cpu_time_system: Float
|-- cpu_time_children_user: Float
|-- cpu_time_children_system: Float
|-- container_nr_sleeping: Float
|-- container_nr_running: Float
|-- container_nr_stopped: Float
|-- container_nr_uninterruptible: Float
|-- container_nr_iowait: Float
И это еще одна версия кода, которая тоже не сработала
package wikiedits;
import static com.sun.xml.internal.fastinfoset.alphabet.BuiltInRestrictedAlphabets.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableEnvironment;
public class Csv {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
//TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("home/merlin/Experiments/input_container/container_data1.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
// env.execute();
}
}
Новое редактирование. Если бы мне пришлось передать его в тему Кафки, а затем в вызов функции? Вот что я попробовал:
DataStreamSink<Row> stream = addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties));
//DataStreamSink<Row> stream = tEnv.toAppendStream(table, Row.class).print();
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
SendToRestAPI sendrest= new SendToRestAPI(value);
String String1= sendrest.converttoJson();
return "Stream Value: " + String1;
}
})
.addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties)); /*.print();*/
env.execute();
}
}
Строка stream.map выдает ошибку:
Cannot find symbol:method Map
2 ответа
Этот код:
package example.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
public class TestFlink {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("container_data.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
С этими библиотеками (некоторые, вероятно, избыточны):
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-runtime_2.11', version: '1.5.1'
Работает по крайней мере. Я не уверен, дает ли он вывод, который вам нужен, но это почти то, что вы добавили в свое последнее редактирование, но работает в IDE. Это помогает?
Если ваш разделитель все еще является пробелом, не забудьте изменить .fieldDelimiter(",")
Вы должны конвертировать Table
в DataStream
распечатать это. Самый простой способ сделать это - преобразовать его в DataStream<Row>
следующее:
DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
// print the stream & execute the program
stream.print();
env.execute();
Смотрите документацию для более подробной информации.