Программа Spark Streaming не записывает в текстовый файл
Я написал приложение для потокового воспроизведения, в котором я пытаюсь обработать данные из раздела Kafka, а затем записать обработанные данные в текстовый файл, но программа ничего не делает!
Код:
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
import dash
from dash.dependencies import Output, Event
import dash_core_components as dcc
import dash_html_components as html
import time
import plotly
import plotly.graph_objs as go
from collections import deque
import sys
from operator import add
import numpy as np
from itertools import chain
import warnings
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy.signal.headers import clibsignal, head_stalta_t
from obspy import read
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def classic_sta_lta_py(a):
"""
Computes the standard STA/LTA from a given input array a. The length of
the STA is given by nsta in samples, respectively is the length of the
LTA given by nlta in samples. Written in Python.
.. note::
There exists a faster version of this trigger wrapped in C
called :func:`~obspy.signal.trigger.classic_sta_lta` in this module!
:type a: NumPy :class:`~numpy.ndarray`
:param a: Seismic Trace
:type nsta: int
:param nsta: Length of short time average window in samples
:type nlta: int
:param nlta: Length of long time average window in samples
:rtype: NumPy :class:`~numpy.ndarray`
:return: Characteristic function of classic STA/LTA
"""
# The cumulative sum can be exploited to calculate a moving average (the
# cumsum function is quite efficient)
nsta = 2
nlta = 20
sta = np.cumsum(a ** 2)
# Convert to float
sta = np.require(sta, dtype=np.float)
# Copy for LTA
lta = sta.copy()
# Compute the STA and the LTA
sta[nsta:] = sta[nsta:] - sta[:-nsta]
sta /= nsta
lta[nlta:] = lta[nlta:] - lta[:-nlta]
lta /= nlta
# Pad zeros
sta[:nlta - 1] = 0
# Avoid division by zero by setting zero values to tiny float
dtiny = np.finfo(0.0).tiny
idx = lta < dtiny
lta[idx] = dtiny
return sta / lta
def saveRec(rdd):
rdd.foreach(lambda rec: open("/Users/zeinab/kafka_2.11-1.1.0/outputFile7.txt", "a").write(rec+"\n"))
app = dash.Dash(__name__)
# Read data
max_length = 50
X = deque(maxlen=max_length)
X.append(0)
Y = deque(maxlen=max_length)
text_file = open("/Users/zeinab/kafka_2.11-1.1.0/outputFile7.txt", "r")
lines = text_file.readlines()
a = []
for l in lines:
a.append(float(l))
app.layout = html.Div(
[
dcc.Graph(id='live-graph', animate=True),
dcc.Interval(
id='graph-update',
interval=1*1000
)
]
)
@app.callback(Output('live-graph', 'figure'),
events=[Event('graph-update', 'interval')])
def update_graph_scatter():
#times.append(time.time())
X.append(X[-1]+1)
Y.append(a[0])
del a[0]
data = plotly.graph_objs.Scatter(
x=list(X),
y=list(Y),
name='Scatter',
mode= 'lines+markers'
)
return {'data': [data],'layout' : go.Layout(xaxis=dict(range=[min(X),max(X)]),
yaxis=dict(range=[min(Y),max(Y)]))}
if __name__ == "__main__":
print("hello")
sc = SparkContext(appName="STALTA")
ssc = StreamingContext(sc, 5)
broker, topic = sys.argv[1:]
# Connect to Kafka
kvs = KafkaUtils.createStream(ssc, broker, "raw-event-streaming-consumer",{topic:1})
lines = kvs.map(lambda x: x[1])
ds = lines.flatMap(lambda line: line.strip().split("\n")).map(lambda strelem: float(strelem))
mapped = ds.mapPartitions(lambda i: classic_sta_lta_py(np.array(list(i))))
lines2 = mapped.map(lambda y: y)
mapped2 = lines2.map(lambda w: str(w))
mapped2.foreachRDD(saveRec)
ssc.start()
ssc.awaitTermination()
app.run_server(debug=True)
Код работает хорошо без приложения Dash, но так как мне нужно также визуализировать обработанные данные, я добавил приложение Dash, но оно не работает. Любая идея?
Спасибо.