Обновление или вставка потоков с помощью Spark Cassandra Connector

С помощью Spark Cassandra Connector Все потоковые данные всегда вставляются в Cassandra DB. Хотя это не желаемый результат.

То, что я хотел бы достичь, это добавить значение в базе данных, когда employeetitle совпадения столбцов.

Вот то, что я имею до сих пор

// Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, Loan>> messages = KafkaUtils.createDirectStream(
            javaStreamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
    JavaDStream<Loan> loanDStream = messages.map(record -> record.value());
    loanDStream.foreachRDD((loanJavaRDD, time) -> {
        System.out.println("Count "+loanJavaRDD.count());
    });
    JavaDStream<Loan> window = loanDStream.window(Durations.minutes(1), Durations.seconds(10));
    JavaPairDStream<String, BigDecimal> employeeTitleLoanPair = window.mapToPair(loan -> new Tuple2<>(loan.getEmployeeTitle(), loan.getLoanAmount())).reduceByKey((bigDecimal, bigDecimal2) -> bigDecimal.add(bigDecimal2));
    employeeTitleLoanPair.print();
    // Map Cassandra table column
    Map<String, String> columnNameMappings = new HashMap<String, String>();
    columnNameMappings.put("id", "id");
    columnNameMappings.put("employeeTitle", "employeetitle");
    columnNameMappings.put("totalLoan", "totalloan");

    employeeTitleLoanPair.foreachRDD((pairsRDD, time) -> {
        CassandraJavaUtil
                .javaFunctions(pairsRDD.map(pair -> new EmployeeLoan(UUID.randomUUID(), pair._1, pair._2)).filter(employeeLoan -> !employeeLoan.getEmployeeTitle().equals("")))
                .writerBuilder("loan_keyspace", "emp_title_loans", CassandraJavaUtil.mapToRow(EmployeeLoan.class, columnNameMappings))
                .saveToCassandra();
    });

    // Start the computation
    javaStreamingContext.start();
    javaStreamingContext.awaitTermination();

Как я могу проверить на равенство столбцов и либо обновить, либо вставить

0 ответов