Apache-beam программа для анализа настроений
Я написал программу apache-beam, которая берет тексты из файла input.txt и проводит некоторый анализ настроений и вывод, который я хочу сохранить в формате csv, чтобы вставить его в bigquery.
import os
import logging
import csv
import json
import re
import apache_beam as beam
from datetime import datetime
from google.cloud import storage
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
from google.oauth2 import service_account
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.textio import ReadFromText, WriteToText
input_filename = "input.txt"
output_filename = "output.txt"
options = PipelineOptions()
gcloud_options = options.view_as(GoogleCloudOptions)
gcloud_options.job_name = 'sent-anal'
#gcloud_options.project =
#gcloud_options.temp_location =
#gcloud_options.staging.location =
# Local runner
options.view_as(StandardOptions).runner = 'direct'
# Dataflow runner
#options.view_as(StandardOptions).runner = 'dataflow'
class Split(beam.DoFn):
def process(self,element):
element = element.rstrip("\n").decode("utf-8")
text = element.split(',')
#var = re.sub('\W+',' ', text )
mydict = {}
for i in range(len(text)):
#mydict["text"] = text[i]
#print(mydict)
dat = text[i]
#print(dat)
client = language.LanguageServiceClient()
document = types.Document(content=dat,type=enums.Document.Type.PLAIN_TEXT)
sent_analysis = client.analyze_sentiment(document=document)
sentiment = sent_analysis.document_sentiment
#mydict["score"]=sentiment.score
#print(mydict)
data = {"text":dat,"score":sentiment.score}
mydict.update(data)
#print(mydict)
return data
class WriteToCSV(beam.DoFn):
def process(self, element):
print(element)
return
with beam.Pipeline(options=options) as p:
rows = (
p |
ReadFromText(input_filename) |
beam.ParDo(Split())
)
output_file_txt=(
rows |
beam.ParDo(WriteToCSV())
)
Сначала мой выходной файл не сохраняется в нужном формате. Во-вторых, когда я пытаюсь напечатать (элемент) в WriteToCSV, результат выводится как текстовая оценка, а мне нужно что-то вроде этого: SN, текст, оценка 1, пожалуйста, помогите кому-нибудь, 10.00 2, .....
Это мой файл input.txt: возможно, политический климат таков, что ваши новостные интересы на мгновение сместились, расширение TLDR p VS для тех из нас, у кого открыто несколько окон и которые хотят визуально различать их, слегка меняя цвета, Скотт Ааронсон Лучшая книга для детей x27 о вычислительной универсальности, которую я прочитал x27,