Можно ли использовать Riak CS с Apache Flink?

Я хочу настроить filesystem государственный бэкэнд и zookeeper режим восстановления:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

Как видите, я должен указать checkpointdir а также storageDir параметров, но у меня нет файловых систем, поддерживаемых Apache Flink (например, HDFS или Amazon S3). Но я установил кластер Riak CS (кажется, что он совместим с S3).

Итак, могу ли я использовать Riak CS вместе с Apache Flink? Если это возможно: как настроить Apache Flink для работы с Riak CS?

1 ответ

Решение

Ответ: Как объединить Apache Flink и Riak CS?

Riak CS имеет интерфейс, совместимый с S3 (версия 2). Таким образом, можно использовать адаптер файловой системы S3 от Hadoop для работы с Riak CS.

Я не знаю почему, но Apache Flink содержит только часть адаптеров файловой системы Hadoop внутри толстой фляги (lib/flink-dist_2.11-1.0.1.jarт.е. он имеет файловую систему FTP (org.apache.hadoop.fs.ftp.FTPFileSystem) но не имеет файловой системы S3 (т.е. org.apache.hadoop.fs.s3a.S3AFileSystem). Итак, у вас есть 2 способа решить эту проблему:

  • используйте эти адаптеры из установки Hadoop. Я не пробовал этого, но кажется, что вам нужно просто настроить переменную HADOOP_CLASSPATH или HADOOP_HOME evn.
  • Обезьяна патч Apache Flink и загрузить необходимые файлы JAR для <flink home>/lib каталог

Поэтому я выбираю второй путь, потому что не хочу предоставлять Hadoop в своей среде. Вы можете скопировать файлы JAR из Hadoop dist или Интернета:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

Как вы можете видеть, я использую старые версии, потому что такая версия используется в Hadoop 2.7.2, и я использую Flink, совместимый с этой версией Hadoop.

К вашему сведению: такой взлом может вызвать проблемы, если вы используете последнюю версию этих JAR-файлов в своем потоке. Чтобы избежать проблем, связанных с различными версиями, вы можете перемещать пакеты, когда вы создаете толстый jar с потоком, используйте что-то вроде (я использую Gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

Затем вы должны указать путь к core-site.xml в flink-conf.yaml потому что Hadoop-совместимые файловые системы используют этот конфиг для загрузки настроек:

...
fs.hdfs.hadoopconf: /flink/conf
...

Как вы видите, я просто поместил его <fink home>/conf каталог. Он имеет следующие настройки:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

Затем вы должны настроить Riak CS ведра в flink-conf.yaml в качестве рекомендации здесь:

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

и создать ведра в Riak CS. я использую s3cmd (установлено поверх brew в моем OS X dev env):

s3cmd mb s3://example-staging-flink

К вашему сведению: перед использованием s3cmd вы должны настроить это использовать s3cmd --configure а затем исправить некоторые настройки в ~/.s3cmd файл:

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

Итак, это все, что вы должны настроить для сохранения / восстановления состояния автономного кластера Apache Flink HA в Riak CS.

Другие вопросы по тегам