Spark Structured Streaming не работает непрерывно?
Код сервера:
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Random;
public class Socket_server {
public static void main(String[] args) throws Exception {
ServerSocket sc = new ServerSocket(9990);
while (true) {
Socket socket = sc.accept();
java.io.OutputStream out = socket.getOutputStream();
String message = getRandomIntegerBetweenRange(100, 120) + "";
byte b[] = message.getBytes(Charset.defaultCharset());
out.write(b);
out.close();
socket.close();
}
}
private static double getRandomIntegerBetweenRange(double max, double min) {
double x = (int) (Math.random() * ((max - min) + 1)) + min;
return x;
}
}
Искровой код:
import java.util.Collections;
import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function1;
public class App1 {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.setAppName("app");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9990)
.load();
StreamingQuery query = lines.writeStream().format("console").start();
query.awaitTermination();
}
}
Я использую серверный код, который генерирует случайные значения, и после этого я использую код структурированного потокового воспроизведения, чтобы прочитать его и создать из него DataFrame. Но когда мой искровой код запускается, он просто читает первое значение с сервера, после чего он больше не считывает значение. Когда я использую тот же сервер с потоковой передачей, то он непрерывно читает значения. Так может кто-нибудь помочь в том, что не так с кодом.