DataFlow Apache Beam Java JdbcIO Проблема чтения аргументов

Я совершенно новичок в Apache Beam и Java. Я работаю над PHP около 5 лет, но я не работал на Java последние 5 лет:), плюс Apache Beam SDK в java - это тоже что-то новое, так что терпите меня. Я хотел бы реализовать конвейер, в котором я буду получать данные из Google PubSub, сопоставлять соответствующие поля в массиве и затем проверять его на MySql Db, чтобы увидеть, принадлежит ли сообщение одной таблице, после этого мне нужно будет отправить вызов API в наш API это обновит некоторые данные в нашем приложении БД. Другой конвейер будет обогащать данные из asticsearch и вставлять их в BigQuery.

Но на данный момент я застрял с чтением данных из MySql, я просто не могу принять аргумент в PCollection с использованием JdbcIO.

Мой план состоит в том, чтобы проверить, присутствует ли в таблице Mysql текущее значение, которое я получаю из pubsub (значение listid).

Вот мой код, любая помощь будет оценена.

Pipeline p = Pipeline.create(options);

    org.apache.beam.sdk.values.PCollection<PubsubMessage> messages = p.apply(PubsubIO.readMessagesWithAttributes()
            .fromSubscription("*******"));

    org.apache.beam.sdk.values.PCollection<String> messages2 = messages.apply("GetPubSubEvent",
            ParDo.of(new DoFn<PubsubMessage, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    Map<String, String> Map = new HashMap<String, String>();
                    PubsubMessage message = c.element();
                    String messageText = new String(message.getPayload(), StandardCharsets.UTF_8);
                    JSONObject jsonObj = new JSONObject(messageText);
                    String requestURL = jsonObj.getJSONObject("httpRequest").getString("requestUrl");
                    String query = requestURL.split("\\?")[1];
                    final Map<String, String> querymap = Splitter.on('&').trimResults().withKeyValueSeparator("=")
                            .split(query);
                    JSONObject querymapJson = new JSONObject(querymap);
                    int subscriberid = 0;
                    int listid = 0;
                    int statid = 0;
                    int points = 0;
                    String stattype = "";
                    String requesttype = "";
                    try {
                        subscriberid = querymapJson.getInt("emp_uid");
                    } catch (Exception e) {
                    }
                    try {
                        listid = querymapJson.getInt("emp_lid");
                    } catch (Exception e) {
                    }
                    try {
                        statid = querymapJson.getInt("emp_statid");
                    } catch (Exception e) {
                    }
                    try {
                        stattype = querymapJson.getString("emp_stattype");
                        Map.put("stattype", stattype);
                    } catch (Exception e) {
                    }
                    try {
                        requesttype = querymapJson.getString("type");
                    } catch (Exception e) {
                    }
                    try {
                        statid = querymapJson.getInt("leadscore");
                    } catch (Exception e) {
                    }

                    Map.put("subscriberid", String.valueOf(subscriberid));
                    Map.put("listid", String.valueOf(listid));
                    Map.put("statid", String.valueOf(statid));
                    Map.put("requesttype", requesttype);
                    Map.put("leadscore", String.valueOf(points));
                    Map.put("requestip", jsonObj.getJSONObject("httpRequest").getString("remoteIp"));
                    System.out.print("Hello from message 1");
                    c.output(Map.toString());
                }
            }));

    org.apache.beam.sdk.values.PCollection<String> messages3 = messages2.apply("Test",
            ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    System.out.println(c.element());
                    System.out.print("Hello from message 2");
                }
            }));
    org.apache.beam.sdk.values.PCollection<KV<String, String>> messages23 = messages2.apply(JdbcIO.<KV<String, String>>read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.apache.derby.jdbc.ClientDriver",
                    "jdbc:derby://localhost:1527/beam"))
            .withQuery("select * from artist").withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
                public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
                    KV<String, String> kv = KV.of(resultSet.getString("label"), resultSet.getString("name"));
                    return kv;
                }

                @Override
                public KV<String, String> mapRow(java.sql.ResultSet resultSet) throws Exception {
                    KV<String, String> kv = KV.of(resultSet.getString("label"), resultSet.getString("name"));
                    return kv;
                }
            }).withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));


    p.run().waitUntilFinish();

0 ответов

Другие вопросы по тегам