Apache-beam программа для анализа настроений

Я написал программу apache-beam, которая берет тексты из файла input.txt и проводит некоторый анализ настроений и вывод, который я хочу сохранить в формате csv, чтобы вставить его в bigquery.

import os
import logging
import csv
import json
import re
import apache_beam as beam 
from datetime import datetime
from google.cloud import storage
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
from google.oauth2 import service_account
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.textio import ReadFromText, WriteToText

input_filename = "input.txt"
output_filename = "output.txt"

options = PipelineOptions()
gcloud_options = options.view_as(GoogleCloudOptions)
gcloud_options.job_name = 'sent-anal'
#gcloud_options.project = 
#gcloud_options.temp_location = 
#gcloud_options.staging.location = 

# Local runner
options.view_as(StandardOptions).runner = 'direct'
# Dataflow runner
#options.view_as(StandardOptions).runner = 'dataflow'

class Split(beam.DoFn):
    def process(self,element):
        element = element.rstrip("\n").decode("utf-8")
        text = element.split(',')
        #var = re.sub('\W+',' ', text )
        mydict = {}
        for i in range(len(text)):
            #mydict["text"] = text[i]
            #print(mydict)
            dat = text[i]
            #print(dat)
            client = language.LanguageServiceClient()
            document = types.Document(content=dat,type=enums.Document.Type.PLAIN_TEXT)
            sent_analysis = client.analyze_sentiment(document=document)
            sentiment = sent_analysis.document_sentiment
            #mydict["score"]=sentiment.score
            #print(mydict)
            data = {"text":dat,"score":sentiment.score}
            mydict.update(data)
            #print(mydict)
        return data

class WriteToCSV(beam.DoFn):
    def process(self, element):
        print(element)
        return 


with beam.Pipeline(options=options) as p:
    rows = (
        p |
        ReadFromText(input_filename) |
        beam.ParDo(Split())
    )
    output_file_txt=(
        rows |
        beam.ParDo(WriteToCSV()) 
    )

Сначала мой выходной файл не сохраняется в нужном формате. Во-вторых, когда я пытаюсь напечатать (элемент) в WriteToCSV, результат выводится как текстовая оценка, а мне нужно что-то вроде этого: SN, текст, оценка 1, пожалуйста, помогите кому-нибудь, 10.00 2, .....

Это мой файл input.txt: возможно, политический климат таков, что ваши новостные интересы на мгновение сместились, расширение TLDR p VS для тех из нас, у кого открыто несколько окон и которые хотят визуально различать их, слегка меняя цвета, Скотт Ааронсон Лучшая книга для детей x27 о вычислительной универсальности, которую я прочитал x27,

0 ответов

Другие вопросы по тегам