Конвертировать скрипт поиска по ключевым словам commoncrawl в Hadoop EMR script

Я создал скрипт поиска по ключевым словам, который запускается из EC2 и успешно сохраняет результаты на s3. Но он однопоточный, поэтому он медленный. Я хочу запустить его на EMR, используя пользовательский JAR. Может кто-нибудь, пожалуйста, преобразовать это в сценарий Hadoop, чтобы я мог запустить его на EMR.

Я новичок в Hadoop. Я попробовал следующие репо без удачи.



Затем я смешал эти два репо, чтобы сделать следующий скрипт.

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.archive.io.warc.WARCReaderFactory;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;

public class S3BucketReader3 {

static public void process(AmazonS3 s3, String bucketName, String prefix,
        int max) throws S3ServiceException, AmazonServiceException,
        AmazonClientException, InterruptedException {
    int maxCount = 0;
    int counter = 0;
    int fileCounter = 1;
    S3Service s3s = new RestS3Service(null);

    // use a callback class for handling WARC record data:
    IProcessWarcRecord processor = new SampleProcessWarcRecord();

    ObjectListing list = s3.listObjects(bucketName, prefix);

    do {

        List<S3ObjectSummary> summaries = list.getObjectSummaries();
        for (S3ObjectSummary summary : summaries) {

            try {

                // get single warc.gz file name
                String key = summary.getKey();
                System.out.println("+ key: " + counter + " " + key);

                if (key.contains(".warc.gz") == false)

                S3Object f = s3s.getObject("aws-publicdatasets", key, null,
                        null, null, null, null, null);

                ArchiveReader ar = WARCReaderFactory.get(key,
                        f.getDataInputStream(), true);

                for (ArchiveRecord r : ar) {
                    // The header file contains information such as the type
                    // of record, size, creation time, and URL

                    if (r.getHeader().getMimetype()
                            .contains("application/http; msgtype=response") == false)

                    // If we want to read the contents of the record, we can
                    // use the ArchiveRecord as an InputStream
                    // Create a byte array that is as long as all the
                    // record's stated length
                    byte[] rawData = IOUtils.toByteArray(r, r.available());

                    // Note: potential optimization would be to have a large
                    // buffer only allocated once

                    // Why don't we convert it to a string and print the
                    // start of it? Let's hope it's text!
                    String content = new String(rawData);
                    if (content.contains("<!DOCTYPE html") == false)

                    // remove header
                    content = content.substring(content
                            .indexOf("<!DOCTYPE html")
                            + "<!DOCTYPE html".length());
                    content = "<!DOCTYPE html" + content;

                    String lowerContent = content.toLowerCase();

                    // search keywords in HTML
                    if (lowerContent.contains("gambler")
                            || lowerContent.contains("rich")
                            || lowerContent.contains("name list")
                            || lowerContent.contains("2nd rich generation")
                            || lowerContent
                                    .contains("2nd official generation")
                            || lowerContent.contains("gambler addict")
                            || lowerContent.contains("gamble")
                            || lowerContent.contains("shanxi")
                            || lowerContent.contains("macau")
                            || lowerContent.contains("rich businessman")
                            || lowerContent.contains("tour")
                            || lowerContent.contains("smoking")) {

                        // write file directly to s3

                        byte[] contentBytes = null;
                        ByteArrayInputStream input = null;
                        try {
                            input = new ByteArrayInputStream(
                            contentBytes = IOUtils.toByteArray(input);
                        } catch (IOException e) {
                                    "Failed while reading bytes from %s",
                        Long contentLength = Long

                        ObjectMetadata metadata = new ObjectMetadata();

                        String dataFileName = "dataFile_" + fileCounter
                                + "_" + System.currentTimeMillis() / 1000;

                        try {
                                    dataFileName, input, metadata);
                        } catch (AmazonServiceException ase) {
                            System.out.println("Error Message:    "
                                    + ase.getMessage());
                            System.out.println("HTTP Status Code: "
                                    + ase.getStatusCode());
                            System.out.println("AWS Error Code:   "
                                    + ase.getErrorCode());
                            System.out.println("Error Type:       "
                                    + ase.getErrorType());
                            System.out.println("Request ID:       "
                                    + ase.getRequestId());
                        } catch (AmazonClientException ace) {
                            System.out.println("Error Message: "
                                    + ace.getMessage());
                        } finally {
                            if (input != null) {

                        contentBytes = null;
                        input = null;

                        // Pretty printing to make the output more readable
                        System.out.println("Files created number: "
                                + fileCounter);
                    } // if keyword match

                    lowerContent = null;
                    content = null;

                } // for each ArchiveRecord

            } catch (Exception ex) {

            System.out.println("Count no: " + counter);

            if (max != -1) {
                if (++maxCount >= max)
        } // for each summary
        list = s3.listNextBatchOfObjects(list);

    } while (list.isTruncated());
    // done processing all WARC records:

static public void main(String[] args) throws S3ServiceException,
        AmazonServiceException, AmazonClientException, InterruptedException {

    String awsAccessKey = "******";
    String awsSecretKey = "******";

    AWSCredentials credentials = new BasicAWSCredentials(awsAccessKey,
    AmazonS3 s3 = new AmazonS3Client(credentials);

    process(s3, "aws-publicdatasets",
            "common-crawl/crawl-data/CC-MAIN-2013-48", -1);

0 ответов

Другие вопросы по тегам