Каков наилучший способ записи в файл в параллельном потоке в Java?

У меня есть программа, которая выполняет много вычислений и часто сообщает о них в файл. Я знаю, что частые операции записи могут сильно замедлить работу программы, поэтому, чтобы избежать этого, я хотел бы иметь второй поток, посвященный операциям записи.

Прямо сейчас я делаю это с этим классом, который я написал (нетерпеливый может пропустить до конца вопроса):

public class ParallelWriter implements Runnable {

    private File file;
    private BlockingQueue<Item> q;
    private int indentation;

    public ParallelWriter( File f ){
        file = f;
        q = new LinkedBlockingQueue<Item>();
        indentation = 0;
    }

    public ParallelWriter append( CharSequence str ){
        try {
            CharSeqItem item = new CharSeqItem();
            item.content = str;
            item.type = ItemType.CHARSEQ;
            q.put(item);
            return this;
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public ParallelWriter newLine(){
        try {
            Item item = new Item();
            item.type = ItemType.NEWLINE;
            q.put(item);
            return this;
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void setIndent(int indentation) {
        try{
            IndentCommand item = new IndentCommand();
            item.type = ItemType.INDENT;
            item.indent = indentation;
            q.put(item);
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void end(){
        try {
            Item item = new Item();
            item.type = ItemType.POISON;
            q.put(item);
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void run() {

        BufferedWriter out = null;
        Item item = null;

        try{
            out = new BufferedWriter( new FileWriter( file ) );
            while( (item = q.take()).type != ItemType.POISON ){
                switch( item.type ){
                    case NEWLINE:
                        out.newLine();
                        for( int i = 0; i < indentation; i++ )
                            out.append("   ");
                        break;
                    case INDENT:
                        indentation = ((IndentCommand)item).indent;
                        break;
                    case CHARSEQ:
                        out.append( ((CharSeqItem)item).content );
                }
            }
        } catch (InterruptedException ex){
            throw new RuntimeException( ex );
        } catch  (IOException ex) {
            throw new RuntimeException( ex );
        } finally {
            if( out != null ) try {
                out.close();
            } catch (IOException ex) {
                throw new RuntimeException( ex );
            }
        }
    }

    private enum ItemType {
        CHARSEQ, NEWLINE, INDENT, POISON;
    }
    private static class Item {
        ItemType type;
    }
    private static class CharSeqItem extends Item {
        CharSequence content;
    }
    private static class IndentCommand extends Item {
        int indent;
    }
}

И тогда я использую это, выполнив:

ParallelWriter w = new ParallelWriter( myFile );
new Thread(w).start();

/// Lots of
w.append(" things ").newLine();
w.setIndent(2);
w.newLine().append(" more things ");

/// and finally
w.end();

Хотя это прекрасно работает, мне интересно:есть ли лучший способ сделать это?

4 ответа

Решение

Ваш основной подход выглядит хорошо. Я бы структурировал код следующим образом:

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public interface FileWriter {
    FileWriter append(CharSequence seq);

    FileWriter indent(int indent);

    void close();
}

class AsyncFileWriter implements FileWriter, Runnable {
    private final File file;
    private final Writer out;
    private final BlockingQueue<Item> queue = new LinkedBlockingQueue<Item>();
    private volatile boolean started = false;
    private volatile boolean stopped = false;

    public AsyncFileWriter(File file) throws IOException {
        this.file = file;
        this.out = new BufferedWriter(new java.io.FileWriter(file));
    }

    public FileWriter append(CharSequence seq) {
        if (!started) {
            throw new IllegalStateException("open() call expected before append()");
        }
        try {
            queue.put(new CharSeqItem(seq));
        } catch (InterruptedException ignored) {
        }
        return this;
    }

    public FileWriter indent(int indent) {
        if (!started) {
            throw new IllegalStateException("open() call expected before append()");
        }
        try {
            queue.put(new IndentItem(indent));
        } catch (InterruptedException ignored) {
        }
        return this;
    }

    public void open() {
        this.started = true;
        new Thread(this).start();
    }

    public void run() {
        while (!stopped) {
            try {
                Item item = queue.poll(100, TimeUnit.MICROSECONDS);
                if (item != null) {
                    try {
                        item.write(out);
                    } catch (IOException logme) {
                    }
                }
            } catch (InterruptedException e) {
            }
        }
        try {
            out.close();
        } catch (IOException ignore) {
        }
    }

    public void close() {
        this.stopped = true;
    }

    private static interface Item {
        void write(Writer out) throws IOException;
    }

    private static class CharSeqItem implements Item {
        private final CharSequence sequence;

        public CharSeqItem(CharSequence sequence) {
            this.sequence = sequence;
        }

        public void write(Writer out) throws IOException {
            out.append(sequence);
        }
    }

    private static class IndentItem implements Item {
        private final int indent;

        public IndentItem(int indent) {
            this.indent = indent;
        }

        public void write(Writer out) throws IOException {
            for (int i = 0; i < indent; i++) {
                out.append(" ");
            }
        }
    }
}

Если вы не хотите писать в отдельном потоке (может быть, в тесте?), Вы можете иметь реализацию FileWriter какие звонки append на Writer в теме звонящего.

Использование LinkedBlockingQueue - довольно хорошая идея. Не уверен, что мне нравится стиль кода... но принцип кажется здравым.

Возможно, я бы добавил емкость к LinkedBlockingQueue, равную определенному% вашей общей памяти... скажем, 10000 элементов... таким образом, если ваша запись идет слишком медленно, ваши рабочие потоки не будут продолжать добавлять больше работы, пока не будет загружена куча,

Хороший способ обмена данными с одним потоком потребителя - это использование обменника.

Вы можете использовать StringBuilder или ByteBuffer в качестве буфера для обмена с фоновым потоком. Задержка может составлять около 1 микросекунды, не требует создания каких-либо объектов и ниже, чем при использовании BlockingQueue.

Из примера, который я думаю, стоит повторить здесь.

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
 }

Я знаю, что частые операции записи могут сильно замедлить работу программы

Вероятно, не так много, как вы думаете, при условии, что вы используете буферизацию.

Другие вопросы по тегам