Как я могу преобразовать данные при использовании библиотеки pg-copy-streams перед отправкой данных в s3
В поисках службы потоковой передачи от postgres до s3 я наткнулся на библиотеку https://www.npmjs.com/package/pg-copy-streams , и кажется, это то, что я искал.
Я следил за этой статьей https://anthony-f-tannous.medium.com/exporting-data-from-rds-postgres-to-s3-with-nodejs-lambda-f37b4dac578f#:~:text=This%20article%20provides%20an%20подход,S3%20с использованием%20%20aws%2Dsdk.
который в основном использует предоставленную вами библиотеку. Теперь у меня есть один вопрос: что, если я захочу преобразовать данные перед отправкой их в s3 (или, можно сказать, вести журнал). Предположим, мне нужен вывод определенного столбца, и я добавляю к нему условие проверки. Поскольку я считаю, что получаю данные в байтах и порциях, я не уверен, как именно я могу этого добиться.
Здесь я вставляю код, который ищу (может помочь), где в основном я проверяю, является ли для определенного столбца конкретная строка, затем меняю ее на другую строку и затем отправляю в s3. С помощью приведенного ниже кода я частично могу добиться того, чего хочу, поскольку в некоторых случаях эта конкретная строка становится неопределенной, вероятно, из-за того, что проблема с фрагментированием приводит к созданию неполной строки.
const AWS = require('aws-sdk')
const { Client } = require('pg')
const copyTo = require('pg-copy-streams').to
const { Transform } = require('stream');
// Target s3 bucket for storing CSV export file
const bucketName = 'any-bucket'
AWS.config.update({
s3ForcePathStyle: true,
region: 'us-east-1',
})
async function s3Upload(inputStream, bucket, s3Key) {
const s3 = new AWS.S3()
const params = {
Bucket: bucket,
Key: s3Key,
Body: inputStream
}
const data = await s3.upload(params).promise()
const { Location } = data
return Location
}
async function generatePresignedUrl(bucket, key) {
const s3 = new AWS.S3();
const params = {
Bucket: bucket,
Key: key,
Expires: 3600, // Presigned URL expiration time in seconds (1 hour)
};
return s3.getSignedUrlPromise('getObject', params);
}
class DataTransformer {
constructor() {}
transformLine(line) {
const [parent_organization_id, parent_organization_type, child_organization_id, child_organization_type] = line.split(',');
try {
if (child_organization_type.trim().toUpperCase() === 'SCHOOL') {
return `${parent_organization_id},${parent_organization_type},${child_organization_id},"college"`;
}
} catch (err) {
console.error(err)
}
return line;
}
}
exports.handler = async function (event, context) {
// RDS endpoint, database name and port.
const dbHost = 'localhost'
const dbName = 'postgres'
const dbPort = 5435
/*
Credentials for Postgres RDS DB. These would be retrieved from a secure
source, e.g, AWS secretsmanager.
*/
const dbUsername = 'postgres'
const dbPassword = 'postgres'
try {
// Instantiate pg Client
var pgClient = new Client({
host: dbHost,
user: dbUsername,
database: dbName,
password: dbPassword,
port: dbPort,
connectionTimeoutMillis: 10000
})
await pgClient.connect()
const exportQuery = copyTo(
`COPY (SELECT parent_organization_id, parent_organization_type, child_organization_id, child_organization_type
from dim.organization_hierarchy order by parent_organization_id limit 100000 )
TO STDOUT with (FORMAT CSV, DELIMITER ',', QUOTE '"', HEADER)`
);
// Output filename to store results
const outputFile = 'sample-tbl.csv';
// Create stream
const readableStream = pgClient.query(exportQuery);
// Create a transform stream to capitalize the data
const dataTransformer = new DataTransformer();
// Create a transform stream to encapsulate the transformation logic
const transformStream = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const transformedLines = lines.map(line => dataTransformer.transformLine(line));
const transformedData = transformedLines.join('\n');
this.push(transformedData);
callback();
},
});
// Pipe the query result stream through the capitalize transform stream
readableStream.pipe(transformStream);
// Call s3Upload, which pipes the capitalized stream to s3.upload(...Body=...)
await s3Upload(transformStream, bucketName, outputFile);
const presignedUrl = await generatePresignedUrl(bucketName, outputFile);
return {
status: 200,
message: 'Completed successfully',
url: presignedUrl
}
} catch (err) {
console.error(err.stack)
throw Error(err.stack)
} finally {
await pgClient.end()
}
}
Вот как я пытался, но, похоже, это не сработало должным образом. Подскажите, пожалуйста, есть ли еще варианты. Спасибо!