Программа Spark Streaming не записывает в текстовый файл

Я написал приложение для потокового воспроизведения, в котором я пытаюсь обработать данные из раздела Kafka, а затем записать обработанные данные в текстовый файл, но программа ничего не делает!

Код:

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from future.builtins import *  # NOQA
import dash
from dash.dependencies import Output, Event
import dash_core_components as dcc
import dash_html_components as html
import time
import plotly
import plotly.graph_objs as go
from collections import deque
import sys
from operator import add
import numpy as np
from itertools import chain
import warnings
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy.signal.headers import clibsignal, head_stalta_t
from obspy import read
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils





def classic_sta_lta_py(a):
    """
    Computes the standard STA/LTA from a given input array a. The length of
    the STA is given by nsta in samples, respectively is the length of the
    LTA given by nlta in samples. Written in Python.

    .. note::

        There exists a faster version of this trigger wrapped in C
        called :func:`~obspy.signal.trigger.classic_sta_lta` in this module!

    :type a: NumPy :class:`~numpy.ndarray`
    :param a: Seismic Trace
    :type nsta: int
    :param nsta: Length of short time average window in samples
    :type nlta: int
    :param nlta: Length of long time average window in samples
    :rtype: NumPy :class:`~numpy.ndarray`
    :return: Characteristic function of classic STA/LTA
    """
    # The cumulative sum can be exploited to calculate a moving average (the
    # cumsum function is quite efficient)
    nsta = 2
    nlta = 20
    sta = np.cumsum(a ** 2)

    # Convert to float
    sta = np.require(sta, dtype=np.float)

    # Copy for LTA
    lta = sta.copy()

    # Compute the STA and the LTA
    sta[nsta:] = sta[nsta:] - sta[:-nsta]
    sta /= nsta
    lta[nlta:] = lta[nlta:] - lta[:-nlta]
    lta /= nlta

    # Pad zeros
    sta[:nlta - 1] = 0

    # Avoid division by zero by setting zero values to tiny float
    dtiny = np.finfo(0.0).tiny
    idx = lta < dtiny
    lta[idx] = dtiny

    return sta / lta



















def saveRec(rdd):
    rdd.foreach(lambda rec: open("/Users/zeinab/kafka_2.11-1.1.0/outputFile7.txt", "a").write(rec+"\n"))







app = dash.Dash(__name__)
# Read data

max_length = 50
X = deque(maxlen=max_length)
X.append(0)
Y = deque(maxlen=max_length)



text_file = open("/Users/zeinab/kafka_2.11-1.1.0/outputFile7.txt", "r")
lines = text_file.readlines()
a = []
for l in lines:
    a.append(float(l))


app.layout = html.Div(
    [
        dcc.Graph(id='live-graph', animate=True),
        dcc.Interval(
            id='graph-update',
            interval=1*1000
        )
    ]
)

@app.callback(Output('live-graph', 'figure'),
              events=[Event('graph-update', 'interval')])
def update_graph_scatter():
    #times.append(time.time())
    X.append(X[-1]+1)
    Y.append(a[0])
    del a[0]


    data = plotly.graph_objs.Scatter(
            x=list(X),
            y=list(Y),
            name='Scatter',
            mode= 'lines+markers'
            )

    return {'data': [data],'layout' : go.Layout(xaxis=dict(range=[min(X),max(X)]),
                                                yaxis=dict(range=[min(Y),max(Y)]))}





if __name__ == "__main__":
    print("hello")
    sc = SparkContext(appName="STALTA")
    ssc = StreamingContext(sc, 5)
    broker, topic = sys.argv[1:]
    # Connect to Kafka

    kvs = KafkaUtils.createStream(ssc, broker, "raw-event-streaming-consumer",{topic:1})
    lines = kvs.map(lambda x: x[1])
    ds = lines.flatMap(lambda line: line.strip().split("\n")).map(lambda strelem: float(strelem))
    mapped = ds.mapPartitions(lambda i: classic_sta_lta_py(np.array(list(i))))

    lines2 = mapped.map(lambda y: y)
    mapped2 = lines2.map(lambda w: str(w))
    mapped2.foreachRDD(saveRec)
    ssc.start()
    ssc.awaitTermination()
    app.run_server(debug=True)

Код работает хорошо без приложения Dash, но так как мне нужно также визуализировать обработанные данные, я добавил приложение Dash, но оно не работает. Любая идея?

Спасибо.

0 ответов

Другие вопросы по тегам