Доступ к распределенному кешу в Pig StoreFunc

Я просмотрел все остальные темы по этой теме и до сих пор не нашел ответа...

Проще говоря, я хочу получить доступ к распределенному кешу hadoop из Pig StoreFunc, а НЕ из UDF напрямую.

Соответствующие строки кода PIG:

DEFINE CustomStorage KeyValStorage('param1','param2','param3');
...
STORE BLAH INTO /path/ using CustomStorage();

Соответствующий Java-код:

public class KeyValStorage<M extends Message> extends BaseStoreFunc /* ElephantBird Storage which inherits from StoreFunc */ {

...
public KeyValStorage(String param1, String param2, String param3) {
    ...
        try {
            InputStream is = new FileInputStream(configName);
            try {
                prop.load(is);
            } catch (IOException e) {
                System.out.println("PROPERTY LOADING FAILED");
                e.printStackTrace();
            }
        } catch (FileNotFoundException e) {
            System.out.println("FILE NOT FOUND");
            e.printStackTrace();
        }
   }
...
}

configName - это имя файла LOCAL, которое я должен быть в состоянии прочитать из распределенного кэша, однако я получаю исключение FileNotFoundException. Когда я использую EXACT тот же код из UDF PIG напрямую, файл обнаруживается, поэтому я знаю, что файл отправляется через распределенный кеш. Я установил соответствующий параметр, чтобы убедиться, что это происходит:

<property><name>mapred.cache.files</name><value>/path/to/file/file.properties#configName</value></property>

Любые идеи, как я могу обойти это?

Спасибо!

1 ответ

Конструктор StroreFunc вызывается как во внешнем, так и во внутреннем интерфейсе. Когда он вызывается из внешнего интерфейса (перед запуском задания), вы получите FileNotFoundException, поскольку в этот момент файлы из распределенного кэша еще не скопированы на локальный диск узлов.
Вы можете проверить, находитесь ли вы на сервере (когда выполняется задание) и загрузить файл только в этом случае, например:

DEFINE CustomStorage KeyValStorage('param1','param2','param3');
set mapreduce.job.cache.files hdfs://host/user/cache/file.txt#config
...
STORE BLAH INTO /path/ using CustomStorage();

public KeyValStorage(String param1, String param2, String param3) {
  ...
  try {
    if (!UDFContext.getUDFContext().isFrontend()) {
      InputStream is = new FileInputStream("./config");
      BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
      ...
  ...
}
Другие вопросы по тегам