Высоко
ценим помощь Использование Spark 2.4.5,scala 2.11.8 с пакетом ниже --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:2.4.5--packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.5
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,
InitialPositionInStream
ssc = StreamingContext(sc, 5)
#ssc = StreamingContext(spark, 5)
conf = SparkConf()
conf.setAppName("PythonStreamingKinesis")
#conf.get("spark.app.name")
kinesisAppName = conf.get("spark.app.name")
regionName = "us-west-1"
streamName = "kinesis_data"
endpointUrl = "https://kinesis.us-west-1.amazonaws.com"
lines = KinesisUtils.createStream(ssc,
kinesisAppName,streamName,endpointUrl,
regionName, InitialPositionInStream.TRIM_HORIZON,2)
lines.pprint()
ssc.start()
*** сообщение журнала ***
Время: 2020-08-25 05:57:25
20/08/25 05:57:25 INFO JobScheduler: завершенное задание потоковой передачи заданий 1598335045000 мс. 1 из заданного времени 1598335045000 мс 20.08.25 05:57:25 INFO JobScheduler: Общая задержка: 0,171 с для времени 1598335045000 мс (выполнение: 0,100 с)
20.08.25 05:57:25 INFO KinesisBackedBlockRDD: удаление RDD 108 из списка сохраняемости 20.08.25 05:57:25 INFO KinesisInputDStream: удаление блоков RDD KinesisBackedBlockRDD[108] в createStream в NativeMethodAccessor.java: время 0 1598335045000 мс 20.08.25 05:57:25 INFO ReceivedBlockTracker: удаление пакетов: 1598335035000 мс 20.08.25 05:57:25 INFO InputInfoTracker: удаление старых метаданных пакета: 1598335035000 мс [Этап 2:> (0 + 1) / 1]20.08.25 05:57:30 INFO JobScheduler: добавлены задания на время 1598335050000 мс 20.08.25 05:57:30 INFO JobScheduler:Запуск задания потоковой передачи заданий 1598335050000 мс.0 из набора заданий времени 1598335050000 мс
Время: 2020-08-25 05:57:30
20/08/25 05:57:30 INFO JobScheduler: завершено задание потоковой передачи заданий 1598335050000 мс.0 из набора времени 1598335050000 мс 20.08.25 05:57:30 INFO JobScheduler: запуск задания потоковой передачи заданий 1598335050000 мс.1 с задание времени 1598335050000 мс
Время: 2020-08-25 05:57:30
20/08/25 05:57:30 INFO JobScheduler: завершенное задание потоковой передачи заданий 1598335050000 мс. 1 из набора времени 1598335050000 мс 20.08.25 05:57:30 INFO JobScheduler: Общая задержка: 0,157 с для времени 1598335050000 мс (выполнение: 0,094 с)
20.08.25 05:57:30 INFO KinesisBackedBlockRDD: Удаление RDD 116 из списка сохраняемости 20.08.25 05:57:30 INFO KinesisInputDStream: Удаление блоков RDD KinesisBackedBlockRDD[116] в createStream в NativeMethodAccessorImpl.java: время 0 1598335050000 мс 20.08.25 05:57:30 INFO ReceivedBlockTracker: Удаление пакетов: 1598335040000 мс 20.08.25 05:57:30 INFO InputInfoTracker: удаление старых метаданных пакета: 1598335040000 мс [Этап 2:> (0 + 1) / 1]20/08/25 05:57:35 INFO JobScheduler: добавлены задания на время 1598335055000 мс 20/08/25 05:57:35 INFO JobScheduler:Запуск задания потоковой передачи заданий 1598335055000 мс.0 из набора заданий времени 1598335055000 мс
Время: 2020-08-25 05:57:35
20/08/25 05:57:35 INFO JobScheduler: завершено задание потоковой передачи заданий 1598335055000 мс.0 из набора времени 1598335055000 мс 20.08.25 05:57:35 INFO JobScheduler: запуск задания потоковой передачи заданий 1598335055000 мс.1 с задание времени 1598335055000 мс