Странная ошибка при заполнении объекта мутации накопления 1.6 через spark-notebook

Используя spark-notebook для обновления накопительной таблицы. используя метод, указанный как в документации коду, так и в примере кода. Ниже дословно, что я положил в тетрадь, и ответы:

val clientRqrdTble = new ClientOnRequiredTable
val bwConfig = new BatchWriterConfig
val batchWriter = connector.createBatchWriter("batchtestY", bwConfig);

clientRqrdTble: org.apache.accumulo.core.cli. maxWriteThreads=3, время ожидания = 9223372036854775807] batchWriter: org.apache.accumulo.core.client.BatchWriter = org.apache.accumulo.core.client.impl.BatchWriterImpl@298aa736

val rowIdS = rddX2_first._1.split(" ")(0)

rowIdS: String = row_0736460000

val mutation = new Mutation(new Text(rowIdS))

мутация: org.apache.accumulo.core.data.Mutation = org.apache.accumulo.core.data.Mutation@0

mutation.put(
  new Text("foo"), 
  new Text("1"), 
  new ColumnVisibility("exampleVis"), 
  new Value(new String("CHEWBACCA!").getBytes) )

java.lang.IllegalStateException: не может добавить к мутации после сериализации в org.apache.accumulo.core.data.Mutation.put(Mutation.java:168) в org.apache.accumulo.core.data.Mutation.put(Mutation.java:163) at org.apache.accumulo.core.data.Mutation.put(Mutation.java:211)

Я копался в коде и вижу, что виновником является if-catch, который проверяет, является ли буфер UnsynchronizedBuffer.Writer нулевым. номера строк не будут совпадать, потому что это немного другая версия, чем та, что есть в банке с ядром 1.6 - я смотрел на оба варианта, и разница не та, которая имеет значение в этом случае. насколько я могу судить, объект создается до выполнения этого метода и не сбрасывается.

так что либо я что-то упустил в коде, либо что-то еще. Кто-нибудь из вас знает, что может быть причиной такого поведения?

ОБНОВЛЕНИЕ ОДИН

Я выполнил следующий код с помощью консоли Scala и прямой Java 1.8. Сбой в scala, но не в Java. Я думаю, что это проблема Accumulo на данный момент. Таким образом, я собираюсь открыть билет об ошибке и копнуть глубже в источник. Если я приду с решением, я опубликую здесь.

ниже приведен код в форме Java. Там есть кое-что еще, потому что я хотел убедиться, что смог соединиться с таблицей, которую создал, на примере накопителя пакетов:

import java.util.Map.Entry;

import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.client.*;
import org.apache.accumulo.core.client.mapred.*;
import org.apache.accumulo.core.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ClientOnRequiredTable.*;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.conf.Configured.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;

public class App {

    public static void main( String[] args ) throws 
                                            AccumuloException, 
                                            AccumuloSecurityException, 
                                            TableNotFoundException {
        // connect to accumulo using a scanner
        // print first ten rows of a given table
        String instanceNameS = "accumulo";
        String zooServersS = "localhost:2181";
        Instance instance = new ZooKeeperInstance(instanceNameS, zooServersS);
        Connector connector = 
                instance.getConnector( "root", new PasswordToken("password"));

        Authorizations auths = new Authorizations("exampleVis");
        Scanner scanner = connector.createScanner("batchtestY", auths);

        scanner.setRange(new Range("row_0000000001", "row_0000000010"));

        for(Entry<Key, Value> entry : scanner) {
          System.out.println(entry.getKey() + " is " + entry.getValue());
        }


        // stage up connection info objects for serialization
        ClientOnRequiredTable clientRqrdTble = new ClientOnRequiredTable();
        BatchWriterConfig bwConfig = new BatchWriterConfig();
        BatchWriter batchWriter = 
                connector.createBatchWriter("batchtestY", bwConfig);

        // create mutation object
        Mutation mutation = new Mutation(new Text("row_0000000001"));

        // populate mutation object
        // -->THIS IS WHAT'S FAILING IN SCALA<--
        mutation.put(
                  new Text("foo"), 
                  new Text("1"), 
                  new ColumnVisibility("exampleVis"), 
                  new Value(new String("CHEWBACCA!").getBytes()) );                                           
    }
}

ОБНОВЛЕНИЕ ВТОРОЕ

для этой проблемы был создан тикет Accumulo. Их цель - исправить это в v1.7.0. до тех пор решение, которое я предоставил ниже, является функциональным обходным решением.

2 ответа

Решение

Поэтому кажется, что код, который отлично работает с Java, не очень хорошо работает со Scala. Решение (не обязательно ХОРОШЕЕ, но рабочее) заключается в создании java-метода в автономном фляге, который создает объект мутации и возвращает его. таким образом, вы можете добавить банку в classpath искры и вызвать необходимый метод ass. протестирован с использованием ноутбука spark и успешно обновил существующую таблицу накопления. я все еще собираюсь отправить билет на собранные пипы, так как этот вид обходного пути не должен считаться "лучшей практикой".

Похоже, что то, что происходит в spark-notebook, когда выполняется новая ячейка Mutation, выполняет сериализацию Mutation. Вы не можете вызвать мутацию после того, как она была сериализована. Я бы попытался добавить вызовы mutation.put в ту же ячейку ноутбука, что и новая команда Mutation. Похоже, команды clientRqrdTble/bwConfig/batchWriter находятся в одной многострочной ячейке, так что, надеюсь, это будет возможно и для мутации.

Другие вопросы по тегам