Можно ли использовать Riak CS с Apache Flink?
Я хочу настроить filesystem
государственный бэкэнд и zookeeper
режим восстановления:
state.backend: filesystem
state.backend.fs.checkpointdir: ???
recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???
Как видите, я должен указать checkpointdir
а также storageDir
параметров, но у меня нет файловых систем, поддерживаемых Apache Flink (например, HDFS или Amazon S3). Но я установил кластер Riak CS (кажется, что он совместим с S3).
Итак, могу ли я использовать Riak CS вместе с Apache Flink? Если это возможно: как настроить Apache Flink для работы с Riak CS?
1 ответ
Ответ: Как объединить Apache Flink и Riak CS?
Riak CS имеет интерфейс, совместимый с S3 (версия 2). Таким образом, можно использовать адаптер файловой системы S3 от Hadoop для работы с Riak CS.
Я не знаю почему, но Apache Flink содержит только часть адаптеров файловой системы Hadoop внутри толстой фляги (lib/flink-dist_2.11-1.0.1.jar
т.е. он имеет файловую систему FTP (org.apache.hadoop.fs.ftp.FTPFileSystem
) но не имеет файловой системы S3 (т.е. org.apache.hadoop.fs.s3a.S3AFileSystem
). Итак, у вас есть 2 способа решить эту проблему:
- используйте эти адаптеры из установки Hadoop. Я не пробовал этого, но кажется, что вам нужно просто настроить переменную HADOOP_CLASSPATH или HADOOP_HOME evn.
- Обезьяна патч Apache Flink и загрузить необходимые файлы JAR для
<flink home>/lib
каталог
Поэтому я выбираю второй путь, потому что не хочу предоставлять Hadoop в своей среде. Вы можете скопировать файлы JAR из Hadoop dist или Интернета:
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
Как вы можете видеть, я использую старые версии, потому что такая версия используется в Hadoop 2.7.2, и я использую Flink, совместимый с этой версией Hadoop.
К вашему сведению: такой взлом может вызвать проблемы, если вы используете последнюю версию этих JAR-файлов в своем потоке. Чтобы избежать проблем, связанных с различными версиями, вы можете перемещать пакеты, когда вы создаете толстый jar с потоком, используйте что-то вроде (я использую Gradle):
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
Затем вы должны указать путь к core-site.xml
в flink-conf.yaml
потому что Hadoop-совместимые файловые системы используют этот конфиг для загрузки настроек:
...
fs.hdfs.hadoopconf: /flink/conf
...
Как вы видите, я просто поместил его <fink home>/conf
каталог. Он имеет следующие настройки:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
Затем вы должны настроить Riak CS ведра в flink-conf.yaml
в качестве рекомендации здесь:
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
и создать ведра в Riak CS. я использую s3cmd
(установлено поверх brew
в моем OS X dev env):
s3cmd mb s3://example-staging-flink
К вашему сведению: перед использованием s3cmd
вы должны настроить это использовать s3cmd --configure
а затем исправить некоторые настройки в ~/.s3cmd
файл:
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
Итак, это все, что вы должны настроить для сохранения / восстановления состояния автономного кластера Apache Flink HA в Riak CS.