Spark Structured Streaming не работает непрерывно?

Код сервера:

import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Random;

public class Socket_server {

    public static void main(String[] args) throws Exception {

        ServerSocket sc = new ServerSocket(9990);
        while (true) {
            Socket socket = sc.accept();
            java.io.OutputStream out = socket.getOutputStream();

            String message = getRandomIntegerBetweenRange(100, 120) + "";
            byte b[] = message.getBytes(Charset.defaultCharset());

            out.write(b);
            out.close();
            socket.close();
        }
    }

    private static double getRandomIntegerBetweenRange(double max, double min) {
        double x = (int) (Math.random() * ((max - min) + 1)) + min;

        return x;
    }
}

Искровой код:

import java.util.Collections;

import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;

import scala.Function1;

public class App1 {

    public static void main(String[] args) throws Exception {

        SparkConf conf = new SparkConf();
        conf.setMaster("local[*]");
        conf.setAppName("app");

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
        Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9990)
                .load();

        StreamingQuery query = lines.writeStream().format("console").start();

        query.awaitTermination();
    }
}

Я использую серверный код, который генерирует случайные значения, и после этого я использую код структурированного потокового воспроизведения, чтобы прочитать его и создать из него DataFrame. Но когда мой искровой код запускается, он просто читает первое значение с сервера, после чего он больше не считывает значение. Когда я использую тот же сервер с потоковой передачей, то он непрерывно читает значения. Так может кто-нибудь помочь в том, что не так с кодом.

0 ответов

Другие вопросы по тегам