Вставка в таблицу cassandra с составным первичным ключом от hadoop Reduce
Я использую Apache Hadoop, MapReduce и Cassandra для запуска задания MapReduce, которое считывает данные из таблицы Cassandra и выводит их в другую таблицу Cassandra.
У меня есть несколько заданий, которые выводятся в таблицу с одним первичным ключом. Например, эта таблица для подсчета количества слов каждого типа имеет один ключ.
CREATE TABLE word_count(
word text,
count int,
PRIMARY KEY(text)
) WITH COMPACT STORAGE;
Связанный класс Reduce выглядит примерно так:
public static class ReducerToCassandra
extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
public void reduce(Text word, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);
ByteBuffer keyByteBuffer = ByteBufferUtil.bytes(word.toString());
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}
Если я хочу добавить дополнительный столбец, то мне просто нужно добавить еще одну мутацию в List<Mutation>
уже выводится reduce
но я не могу понять, как выводить в таблицу с новым столбцом в составном первичном ключе. Например, эта таблица делает то же, что и выше, но также индексирует слова вместе с часом их публикации.
CREATE TABLE word_count(
word text,
publication_hour bigint,
count int,
PRIMARY KEY(word, publication_hour)
) WITH COMPACT STORAGE;
Я пробовал несколько разных подходов, например, пытаясь вывести пользовательский WritableComparable
(который содержит и слово и час) и обновление class
а также method
подписи и job
конфигурация соответственно, но это делает reduce
бросить ClassCastException
когда он пытается бросить обычай WritableComparable
в ByteBuffer
,
Я попытался создать соответствующее имя столбца с Builder
,
public static class ReducerToCassandra
// MappedKey MappedValue ReducedKey ReducedValues
extends Reducer<WordHourPair, IntWritable, ByteBuffer, List<Mutation>>
{
// MappedKey Values with the key wordHourPair
public void reduce(WordHourPair wordHourPair, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
long hour = wordHourPair.getHourLong();
org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);
//New Code
List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>();
keyTypes.add(UTF8Type.instance);
keyTypes.add(LongType.instance);
CompositeType compositeKey = CompositeType.getInstance(keyTypes);
Builder builder = new Builder(compositeKey);
builder.add(ByteBufferUtil.bytes(word.toString());
builder.add(ByteBufferUtil.bytes(hour));
ByteBuffer keyByteBuffer = builder.build();
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}
Но это бросает IOException
java.io.IOException: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:204)
Caused by: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28232)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28218)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:28152)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1069)
at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1055)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:196)
Вопрос: составной ключ Cassandra CQL3, не написанный редуктором Hadoop, похоже, показывает тот код, который я ищу, но он вызывает context.write
с параметрами типа HashMap, ByteBuffer
и я не уверен, как бы я сделал context.write
принять эти параметры.
Как я могу получить данные, которые я хочу (словесные ключи, значения int) в мою таблицу?
1 ответ
Ответом было использование CQL-интерфейса Cassandra, а не Thrift API.
Теперь я могу записать в таблицу с составным ключом, объявив классы выходного ключа / значения моего класса Reduce как "Map, List", а затем создать карту для составного ключа, где Key (типа string) является столбцом name, а Value (типа ByteBuffer) - это значение столбцов, преобразованное в ByteBuffer с помощью ByteBufferUtil.
Например, чтобы записать в таблицу, определенную так:
CREATE TABLE foo (
customer_id uuid,
time timestamp,
my_value int,
PRIMARY KEY (customer_id, time)
)
Я могу написать:
String customerID = "the customer's id";
long time = DateTime.now().getMillis();
int myValue = 1;
Map<String, ByteBuffer> key = new Map<String, ByteBuffer>();
key.put("customer_id",ByteBufferUtil.bytes(customerID));
key.put("time",ByteBufferUtil.bytes(time));
List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(myValue));
context.write(key, values);