Искра работы (скала) написать тип Дата Кассандра

Я использую DSE 5.1 ​​(спарк 2.0.2.6 и Кассандра 3.10.0.1652)

Мой стол Кассандры:

CREATE TABLE ks.tbl (
   dk int,
   date date,
   ck int,
   val int,
PRIMARY KEY (dk, date, ck)
) WITH CLUSTERING ORDER BY (date DESC, ck ASC);

со следующими данными:

 dk | date       | ck | val
----+------------+----+-----
  1 | 2017-01-01 |  1 | 100
  1 | 2017-01-01 |  2 | 200

Мой код должен прочитать эти данные и написать то же самое, но со вчерашней датой (он успешно компилируется):

package com.datastax.spark.example

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import com.github.nscala_time.time._
import com.github.nscala_time.time.Imports._

object test extends App {

  val conf = new SparkConf().setAppName("DSE calculus app TEST")
  val sc = new SparkContext(conf)

  val yesterday= (DateTime.now - 1.days).toString(StaticDateTimeFormat.forPattern("yyyy-MM-dd"))

  val tbl = sc.cassandraTable("ks","tbl").select("dk","date","ck","val").where("dk=1")

  tbl.map(row => (row.getInt("dk"),yesterday,row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

  sc.stop()
  sys.exit(0)
}

Когда я запускаю это приложение:

dse spark-submit --class com.datastax.spark.example.test test-assembly-0.1.jar

Он не может правильно написать Кассандре. Кажется, переменная даты не вставлена ​​в карту правильно. Я получаю ошибку:

Error:
WARN  2017-05-08 22:23:16,472 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <IP of one of my nodes>): java.io.IOException: Failed to write statements to ks.tbl.
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

Однако, когда я вставляю дату (строку) непосредственно в оператор map следующим образом, код вставляет данные правильно:

tbl.map(row => (row.getInt("dk"),"2017-02-02",row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

Он также правильно вставляет данные, если я вчера установил целое число (дни с начала эпохи). Это будет оптимально, но "вчера" не может вести себя так

РЕДАКТИРОВАТЬ: Это не вставляет данные правильно, на самом деле. Независимо от того, установил ли я "вчера" 1 или 100 000 000, всегда вставляется эпоха ("1970-01-01)

Код, который не работает, ведет себя правильно и, как я и ожидал, в консоли DSE Spark.

Я просто не могу понять, что я делаю неправильно. Любая помощь приветствуется.

РЕДАКТИРОВАТЬ 2: Журнал stderr Exceutor 0 действительно показывает, что он пытается вставить нулевое значение в дату столбца, что, очевидно, невозможно, так как это столбец кластеризации.

2 ответа

Решение

При написании кода для Spark Job важно понимать, когда устанавливаются конкретные переменные и когда они сериализуются. Давайте посмотрим на записку от App черт документы

Предостережения

Следует отметить, что эта черта реализована с использованием функциональности DelayedInit, что означает, что поля объекта не будут инициализированы до того, как будет выполнен основной метод.

Это означает ссылки на переменные, используемые в теле App возможно, не инициализируются на Executors, когда код фактически выполняется.

Я предполагаю, что лямбда, которую вы написали, содержит ссылку на val, который инициализируется в части Delayed init класса App. Это означает сериализованную версию кода на исполнителе, который не запускает Main Метод получает неинициализированную версию значения (null).

Переключение константы в lazy val (или перемещение его в отдельный объект или класс) решит эту проблему, убедившись, что значение инициализируется удаленно (lazy val) или просто сериализуется, инициализируется (отдельный класс / объект).

Я думаю, я знаю, в чем твоя проблема.
Вы можете увидеть полный файл журнала. Вы просто прикрепляете часть из них...
Сегодня схожая ошибка возникает при создании пространства ключей с replication_factor: 3, когда у меня был только один экземпляр cassandra.

Так что я изменил это, и проблема ушла.

ALTER KEYSPACE "some_keyspace_name" WITH REPLICATION =
  { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

Вот мой файл error.log

И важная часть журнала:

Logging.scala[logError]:72) - Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@4746499f
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)
Другие вопросы по тегам