Как я могу преобразовать данные при использовании библиотеки pg-copy-streams перед отправкой данных в s3

В поисках службы потоковой передачи от postgres до s3 я наткнулся на библиотеку https://www.npmjs.com/package/pg-copy-streams , и кажется, это то, что я искал.

Я следил за этой статьей https://anthony-f-tannous.medium.com/exporting-data-from-rds-postgres-to-s3-with-nodejs-lambda-f37b4dac578f#:~:text=This%20article%20provides%20an%20подход,S3%20с использованием%20%20aws%2Dsdk.

который в основном использует предоставленную вами библиотеку. Теперь у меня есть один вопрос: что, если я захочу преобразовать данные перед отправкой их в s3 (или, можно сказать, вести журнал). Предположим, мне нужен вывод определенного столбца, и я добавляю к нему условие проверки. Поскольку я считаю, что получаю данные в байтах и ​​порциях, я не уверен, как именно я могу этого добиться.

Здесь я вставляю код, который ищу (может помочь), где в основном я проверяю, является ли для определенного столбца конкретная строка, затем меняю ее на другую строку и затем отправляю в s3. С помощью приведенного ниже кода я частично могу добиться того, чего хочу, поскольку в некоторых случаях эта конкретная строка становится неопределенной, вероятно, из-за того, что проблема с фрагментированием приводит к созданию неполной строки.

      const AWS = require('aws-sdk')
const { Client } = require('pg')
const copyTo = require('pg-copy-streams').to
const { Transform } = require('stream');

// Target s3 bucket for storing CSV  export file
const bucketName =  'any-bucket'

AWS.config.update({
    s3ForcePathStyle: true,
    region: 'us-east-1',
})

async function s3Upload(inputStream, bucket, s3Key) {
    const s3 = new AWS.S3()
    const params = {
        Bucket: bucket,
        Key: s3Key,
        Body: inputStream
    }
    const data = await s3.upload(params).promise()
    const { Location } = data
    return Location
}

async function generatePresignedUrl(bucket, key) {
    const s3 = new AWS.S3();
    const params = {
        Bucket: bucket,
        Key: key,
        Expires: 3600, // Presigned URL expiration time in seconds (1 hour)
    };
    return s3.getSignedUrlPromise('getObject', params);
}


class DataTransformer {
    constructor() {}

    transformLine(line) {
        const [parent_organization_id, parent_organization_type, child_organization_id, child_organization_type] = line.split(',');
       
        try {
            if (child_organization_type.trim().toUpperCase() === 'SCHOOL') {
                return `${parent_organization_id},${parent_organization_type},${child_organization_id},"college"`;
            }
        } catch (err) {
            console.error(err)
        }

        return line;
    }
}

exports.handler = async function (event, context) {

    // RDS endpoint, database name and port.
    const dbHost = 'localhost'
    const dbName = 'postgres'
    const dbPort = 5435

    /*
      Credentials for Postgres RDS DB. These would be retrieved from a secure
      source, e.g, AWS secretsmanager.
    */

    const dbUsername = 'postgres'
    const dbPassword = 'postgres'

    try {
        // Instantiate pg Client

        var pgClient = new Client({
            host: dbHost,
            user: dbUsername,
            database: dbName,
            password: dbPassword,
            port: dbPort,
            connectionTimeoutMillis: 10000
        })

        await pgClient.connect()

        const exportQuery = copyTo(
            `COPY (SELECT parent_organization_id, parent_organization_type, child_organization_id, child_organization_type
 from dim.organization_hierarchy order by parent_organization_id limit 100000 ) 
            TO STDOUT with (FORMAT CSV, DELIMITER ',', QUOTE '"', HEADER)`
        );

        // Output filename to store results
        const outputFile = 'sample-tbl.csv';

        // Create stream
        const readableStream = pgClient.query(exportQuery);
        
        // Create a transform stream to capitalize the data
        const dataTransformer = new DataTransformer();

        // Create a transform stream to encapsulate the transformation logic
        const transformStream = new Transform({
            transform(chunk, encoding, callback) {
                const lines = chunk.toString().split('\n');
                const transformedLines = lines.map(line => dataTransformer.transformLine(line));
                const transformedData = transformedLines.join('\n');
                this.push(transformedData);
                callback();
            },
        });

    // Pipe the query result stream through the capitalize transform stream
    readableStream.pipe(transformStream);

    // Call s3Upload, which pipes the capitalized stream to s3.upload(...Body=...)
    await s3Upload(transformStream, bucketName, outputFile);

    const presignedUrl = await generatePresignedUrl(bucketName, outputFile);

        return {
            status: 200,
            message: 'Completed successfully',
            url: presignedUrl
        }
    } catch (err) {
        console.error(err.stack)
        throw Error(err.stack)
    } finally {
        await pgClient.end()
    }
}

Вот как я пытался, но, похоже, это не сработало должным образом. Подскажите, пожалуйста, есть ли еще варианты. Спасибо!

0 ответов

Другие вопросы по тегам