Соединитель JDBC Kafka с устаревшей базой данных

Я пытаюсь использовать kafka-jdbc-connector (как источник, так и приемник) с очень старой базой данных (cloudscape). У меня есть драйвер JDBC этой базы данных. Я поместил драйвер в папку Confluent "/share/java/kafka/connect/jdbc" (версия 5) и создал файл свойств.

name=test-source-cloud-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
mode=incrementing
incrementing.column.name=id
topic.prefix=test-cloud-jdbc-

Когда я запускаю соединитель, журналы:

[2019-01-31 11:23:36,582] DEBUG Finding best dialect for JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:119)
[2019-01-31 11:23:36,582] DEBUG Dialect Db2DatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect DerbyDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect GenericDatabaseDialect scored 10 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect MySqlDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect OracleDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,582] DEBUG Dialect PostgreSqlDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SapHanaDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SqlServerDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SqliteDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect SybaseDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Dialect VerticaDatabaseDialect scored 0 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:126)
[2019-01-31 11:23:36,583] DEBUG Using dialect GenericDatabaseDialect with score 10 against JDBC subprotocol 'cloudscape' and source 'jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB' (io.confluent.connect.jdbc.dialect.DatabaseDialects:132)
[2019-01-31 11:23:36,587] ERROR Failed to create job for ./etc/kafka-connect-jdbc/CloudscapeProperties.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2019-01-31 11:23:36,588] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value java.sql.SQLException: No suitable driver found for jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB for configuration Couldn't open connection to jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
Invalid value java.sql.SQLException: No suitable driver found for jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB for configuration Couldn't open connection to jdbc:cloudscape:rmi://localhost:1099/CloudscapeDB
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)

Я предполагаю, что есть проблема с фактом, что Драйвер JDBC очень стар (он использует JAVA 1.3). Драйвер использует протокол RMI для связи. Если я запускаю очень простой JAVA RMI-клиент для запроса к БД с использованием RmiJdbc.jar и cloudscape.jar (драйверы), я должен сделать запрос к БД и получить результаты.

Как вы думаете, это проблема, связанная с версией Java? И имеет ли смысл / возможно ли реализовать собственный драйвер Kafka для чтения данных из этой базы данных? Любое предложение об этой проблеме или как реализовать собственный драйвер Kafka для старой БД?

1 ответ

Вы можете изменить META-INF/services/java.sql.Driver, изменить его, перезаписать его классом драйвера. Это может сработать для меня!

Я бы не стал сразу делать вывод, что драйвер / база данных старые...

Вы не можете указать plugin.path непосредственно к драйверу JDBC.

Обновить plugin.path быть абсолютным путем к верхнему уровню /path/to/confluent-x.y.z/usr/share/javaи скопируйте свой драйвер в kafka-connect-jdbc папка.

Перезапустите Connect, тогда драйвер будет найден.


Вы можете отладить это дальше, найдя kafka/connect-log4j.properties и установить

log4j.rootLogger=DEBUG, stdout

Тогда вы увидите логи как это (пример для MySQL)

[2019-01-15 09:45:26,949] DEBUG Registered java.sql.Driver: com.mysql.cj.jdbc.Driver@37303f12 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:277)
Другие вопросы по тегам