DataFlow Apache Beam Java JdbcIO Проблема чтения аргументов
Я совершенно новичок в Apache Beam и Java. Я работаю над PHP около 5 лет, но я не работал на Java последние 5 лет:), плюс Apache Beam SDK в java - это тоже что-то новое, так что терпите меня. Я хотел бы реализовать конвейер, в котором я буду получать данные из Google PubSub, сопоставлять соответствующие поля в массиве и затем проверять его на MySql Db, чтобы увидеть, принадлежит ли сообщение одной таблице, после этого мне нужно будет отправить вызов API в наш API это обновит некоторые данные в нашем приложении БД. Другой конвейер будет обогащать данные из asticsearch и вставлять их в BigQuery.
Но на данный момент я застрял с чтением данных из MySql, я просто не могу принять аргумент в PCollection с использованием JdbcIO.
Мой план состоит в том, чтобы проверить, присутствует ли в таблице Mysql текущее значение, которое я получаю из pubsub (значение listid).
Вот мой код, любая помощь будет оценена.
Pipeline p = Pipeline.create(options);
org.apache.beam.sdk.values.PCollection<PubsubMessage> messages = p.apply(PubsubIO.readMessagesWithAttributes()
.fromSubscription("*******"));
org.apache.beam.sdk.values.PCollection<String> messages2 = messages.apply("GetPubSubEvent",
ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> Map = new HashMap<String, String>();
PubsubMessage message = c.element();
String messageText = new String(message.getPayload(), StandardCharsets.UTF_8);
JSONObject jsonObj = new JSONObject(messageText);
String requestURL = jsonObj.getJSONObject("httpRequest").getString("requestUrl");
String query = requestURL.split("\\?")[1];
final Map<String, String> querymap = Splitter.on('&').trimResults().withKeyValueSeparator("=")
.split(query);
JSONObject querymapJson = new JSONObject(querymap);
int subscriberid = 0;
int listid = 0;
int statid = 0;
int points = 0;
String stattype = "";
String requesttype = "";
try {
subscriberid = querymapJson.getInt("emp_uid");
} catch (Exception e) {
}
try {
listid = querymapJson.getInt("emp_lid");
} catch (Exception e) {
}
try {
statid = querymapJson.getInt("emp_statid");
} catch (Exception e) {
}
try {
stattype = querymapJson.getString("emp_stattype");
Map.put("stattype", stattype);
} catch (Exception e) {
}
try {
requesttype = querymapJson.getString("type");
} catch (Exception e) {
}
try {
statid = querymapJson.getInt("leadscore");
} catch (Exception e) {
}
Map.put("subscriberid", String.valueOf(subscriberid));
Map.put("listid", String.valueOf(listid));
Map.put("statid", String.valueOf(statid));
Map.put("requesttype", requesttype);
Map.put("leadscore", String.valueOf(points));
Map.put("requestip", jsonObj.getJSONObject("httpRequest").getString("remoteIp"));
System.out.print("Hello from message 1");
c.output(Map.toString());
}
}));
org.apache.beam.sdk.values.PCollection<String> messages3 = messages2.apply("Test",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
System.out.print("Hello from message 2");
}
}));
org.apache.beam.sdk.values.PCollection<KV<String, String>> messages23 = messages2.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.apache.derby.jdbc.ClientDriver",
"jdbc:derby://localhost:1527/beam"))
.withQuery("select * from artist").withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
KV<String, String> kv = KV.of(resultSet.getString("label"), resultSet.getString("name"));
return kv;
}
@Override
public KV<String, String> mapRow(java.sql.ResultSet resultSet) throws Exception {
KV<String, String> kv = KV.of(resultSet.getString("label"), resultSet.getString("name"));
return kv;
}
}).withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())));
p.run().waitUntilFinish();