Можно ли читать из InputStream с таймаутом?
В частности, проблема заключается в том, чтобы написать такой метод:
int maybeRead(InputStream in, long timeout)
где возвращаемое значение совпадает с in.read(), если данные доступны в течение миллисекунд 'timeout', и -2 в противном случае. Перед возвратом метода все порожденные потоки должны завершиться.
Чтобы избежать аргументов, предмет здесь java.io.InputStream, как документировано Sun (любая версия Java). Обратите внимание, что это не так просто, как кажется. Ниже приведены некоторые факты, которые подтверждаются непосредственно документацией Sun.
Метод in.read() может быть непрерывным.
Обертывание InputStream в Reader или InterruptibleChannel не помогает, потому что все эти классы могут делать, это вызывать методы InputStream. Если бы можно было использовать эти классы, можно было бы написать решение, которое просто выполняет ту же логику непосредственно на InputStream.
Для in.available() всегда допустимо возвращать 0.
Метод in.close() может блокировать или ничего не делать.
Нет общего способа убить другой поток.
9 ответов
Использование inputStream.available()
System.in.available() всегда допустимо возвращать 0.
Я обнаружил обратное - он всегда возвращает лучшее значение для количества доступных байтов. Javadoc для InputStream.available()
:
Returns an estimate of the number of bytes that can be read (or skipped over)
from this input stream without blocking by the next invocation of a method for
this input stream.
Оценка неизбежна из-за сроков / устаревания. Эта цифра может быть разовой заниженной, поскольку постоянно поступают новые данные. Однако он всегда "догоняет" при следующем вызове - он должен учитывать все поступившие данные, за исключением тех, которые поступают только в момент нового вызова. Постоянно возвращать 0 при наличии данных не соответствует условию выше.
Первое предостережение: за доступность отвечают конкретные подклассы InputStream ()
InputStream
это абстрактный класс. У него нет источника данных. Это бессмысленно иметь доступные данные. Следовательно, Javadoc для available()
также говорится:
The available method for class InputStream always returns 0.
This method should be overridden by subclasses.
И действительно, конкретные классы входного потока переопределяют available(), предоставляя значимые значения, а не постоянные 0.
Второе предупреждение: убедитесь, что вы используете возврат каретки при вводе ввода в Windows.
При использовании System.in
Ваша программа получает ввод только тогда, когда командная оболочка передает его. Если вы используете перенаправление файлов /pipe (например, somefile > java myJavaApp или somecommand | java myJavaApp), то входные данные обычно передаются немедленно. Однако, если вы вводите вручную, передача данных может быть отложена. Например, в оболочке windows cmd.exe данные буферизуются в оболочке cmd.exe. Данные передаются только в исполняющую Java-программу после возврата каретки (control-m или <enter>
). Это ограничение среды исполнения. Конечно, InputStream.available() будет возвращать 0 до тех пор, пока оболочка буферизует данные - это правильное поведение; на данный момент нет доступных данных. Как только данные доступны из оболочки, метод возвращает значение> 0. Примечание: Cygwin также использует cmd.exe.
Самое простое решение (без блокировки, поэтому время ожидания не требуется)
Просто используйте это:
byte[] inputData = new byte[1024];
int result = is.read(inputData, 0, is.available());
// result will indicate number of bytes read; -1 for EOF with no data read.
ИЛИ эквивалентно,
BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
// ...
// inside some iteration / processing logic:
if (br.ready()) {
int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
}
Richer Solution (максимально заполняет буфер в течение периода ожидания)
Объявите это:
public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
throws IOException {
int bufferOffset = 0;
long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
// can alternatively use bufferedReader, guarded by isReady():
int readResult = is.read(b, bufferOffset, readLength);
if (readResult == -1) break;
bufferOffset += readResult;
}
return bufferOffset;
}
Тогда используйте это:
byte[] inputData = new byte[1024];
int readCount = readInputStreamWithTimeout(System.in, inputData, 6000); // 6 second timeout
// readCount will indicate number of bytes read; -1 for EOF with no data read.
Предполагая, что ваш поток не поддерживается сокетом (поэтому вы не можете использовать Socket.setSoTimeout()
), Я думаю, что стандартный способ решения этой проблемы - это использование будущего.
Предположим, у меня есть следующий исполнитель и потоки:
ExecutorService executor = Executors.newFixedThreadPool(2);
final PipedOutputStream outputStream = new PipedOutputStream();
final PipedInputStream inputStream = new PipedInputStream(outputStream);
У меня есть писатель, который пишет некоторые данные, затем ждет в течение 5 секунд, прежде чем записать последний кусок данных и закрытия потока:
Runnable writeTask = new Runnable() {
@Override
public void run() {
try {
outputStream.write(1);
outputStream.write(2);
Thread.sleep(5000);
outputStream.write(3);
outputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
executor.submit(writeTask);
Нормальный способ чтения этого заключается в следующем. Чтение будет блокироваться на неопределенный срок для данных, и это завершается через 5 секунд:
long start = currentTimeMillis();
int readByte = 1;
// Read data without timeout
while (readByte >= 0) {
readByte = inputStream.read();
if (readByte >= 0)
System.out.println("Read: " + readByte);
}
System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");
какие выводы:
Read: 1
Read: 2
Read: 3
Complete in 5001ms
Если бы существовала более фундаментальная проблема, например, что писатель не отвечает, читатель заблокирует его навсегда. Если я заверну чтение в будущем, то смогу контролировать время ожидания следующим образом:
int readByte = 1;
// Read data with timeout
Callable<Integer> readTask = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return inputStream.read();
}
};
while (readByte >= 0) {
Future<Integer> future = executor.submit(readTask);
readByte = future.get(1000, TimeUnit.MILLISECONDS);
if (readByte >= 0)
System.out.println("Read: " + readByte);
}
какие выводы:
Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
at java.util.concurrent.FutureTask.get(FutureTask.java:91)
at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)
Я могу поймать TimeoutException и сделать любую очистку, какую захочу.
Если ваш InputStream поддерживается Socket, вы можете установить время ожидания Socket (в миллисекундах), используя setSoTimeout. Если вызов read() не разблокируется в течение указанного времени, он выдаст исключение SocketTimeoutException.
Просто убедитесь, что вы вызываете setSoTimeout для Socket перед выполнением вызова read().
Я бы поставил вопрос о постановке проблемы, а не просто принял ее вслепую. Вам нужны только тайм-ауты из консоли или по сети. Если последнее у вас есть Socket.setSoTimeout()
а также HttpURLConnection.setReadTimeout()
которые оба делают именно то, что требуется, при условии, что вы правильно настроили их при их создании / приобретении. Оставляя его на произвольном этапе позже в приложении, когда все, что у вас есть, это InputStream - плохой дизайн, приводящий к очень неловкой реализации.
Я не использовал классы из пакета Java NIO, но, похоже, они могут быть здесь полезны. В частности, java.nio.channels.Channels и http://java.sun.com/javase/6/docs/api/java/nio/channels/InterruptibleChannel.html.
Вот способ получить NIO FileChannel из System.in и проверить доступность данных, используя тайм-аут, который является частным случаем проблемы, описанной в этом вопросе. Запустите его на консоли, не вводите никаких данных и ждите результатов. Он был успешно протестирован под Java 6 на Windows и Linux.
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
public class Main {
static final ByteBuffer buf = ByteBuffer.allocate(4096);
public static void main(String[] args) {
long timeout = 1000 * 5;
try {
InputStream in = extract(System.in);
if (! (in instanceof FileInputStream))
throw new RuntimeException(
"Could not extract a FileInputStream from STDIN.");
try {
int ret = maybeAvailable((FileInputStream)in, timeout);
System.out.println(
Integer.toString(ret) + " bytes were read.");
} finally {
in.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/* unravels all layers of FilterInputStream wrappers to get to the
* core InputStream
*/
public static InputStream extract(InputStream in)
throws NoSuchFieldException, IllegalAccessException {
Field f = FilterInputStream.class.getDeclaredField("in");
f.setAccessible(true);
while( in instanceof FilterInputStream )
in = (InputStream)f.get((FilterInputStream)in);
return in;
}
/* Returns the number of bytes which could be read from the stream,
* timing out after the specified number of milliseconds.
* Returns 0 on timeout (because no bytes could be read)
* and -1 for end of stream.
*/
public static int maybeAvailable(final FileInputStream in, long timeout)
throws IOException, InterruptedException {
final int[] dataReady = {0};
final IOException[] maybeException = {null};
final Thread reader = new Thread() {
public void run() {
try {
dataReady[0] = in.getChannel().read(buf);
} catch (ClosedByInterruptException e) {
System.err.println("Reader interrupted.");
} catch (IOException e) {
maybeException[0] = e;
}
}
};
Thread interruptor = new Thread() {
public void run() {
reader.interrupt();
}
};
reader.start();
for(;;) {
reader.join(timeout);
if (!reader.isAlive())
break;
interruptor.start();
interruptor.join(1000);
reader.join(1000);
if (!reader.isAlive())
break;
System.err.println("We're hung");
System.exit(1);
}
if ( maybeException[0] != null )
throw maybeException[0];
return dataReady[0];
}
}
Интересно, что при запуске программы внутри NetBeans 6.5, а не на консоли, тайм-аут не работает вообще, и вызов System.exit() фактически необходим для уничтожения потоков зомби. Что происходит, так это то, что поток прерывания блокирует (!) При вызове reader.interrupt(). Другая тестовая программа (не показана здесь) дополнительно пытается закрыть канал, но это тоже не работает.
Как сказал jt, NIO - лучшее (и правильное) решение. Если вы действительно застряли с InputStream, вы можете
Инициируйте поток, чья эксклюзивная работа - читать из InputStream и помещать результат в буфер, который можно прочитать из вашего исходного потока без блокировки. Это должно работать хорошо, если у вас есть только один экземпляр потока. В противном случае вы можете убить поток, используя устаревшие методы в классе Thread, хотя это может привести к утечке ресурсов.
Положитесь на isAvailable, чтобы указать данные, которые можно прочитать без блокировки. Однако в некоторых случаях (например, в случае с Sockets) может потребоваться потенциально блокирующее чтение для isAvailable, чтобы сообщить о чем-то отличном от 0.
Невозможно правильно заблокировать , если только он не созданSocket
.
1. Никогда не занят-подождите
«Ожидание занятости» означает использование высокой нагрузки на ЦП.while
циклы для опроса ресурса ввода-вывода. Никогда не занят-подождите. Правильный способ — пометить текущий поток как «заблокированный» и позволить ОС выбирать другие потоки для запуска. Когда ресурс ввода-вывода становится доступным ИЛИ истекает время ожидания, ОС отвечает за пометку вашего потока как «ожидающего» и выбор его снова. Встроенные реализации Java делают именно это.
Последствия напряженного ожидания:
- Если у вас есть один занятой официант, процессор будет работать тогда, когда он должен простаивать.
- Если у вас несколько занятых официантов, они будут морить друг друга голодом и создавать задержки в приложении вплоть до времени такта/тика ОС (обычно 10–100 мс). Эффективный коэффициент использования ЦП также падает и может приближаться к 0% в зависимости от ситуации.
- Если у ваших занятых ожидающих приоритеты потоков выше, чем у других потоков реального времени, они даже истощают внешние приложения и приводят к задержкам.
это вызов ввода-вывода. Он не должен занимать процессор(ы) во время ожидания байта/таймаута. Насколько я могу судить,read()
что Java обеспечивает поддержку правильной блокировки, но без возможности тайм-аута. Большинство других ответов на этот вопрос StackOverflow предоставляют тайм-аут, но используют ожидание занятости.
невозможно реализовать тайм-аут без ожидания занятости.
2. Java нарушает ООП, позволяяSocket.setSoTimeout
ВозвращенныйSocket.getInputStream()
простираетсяclass InputStream
.InputStream.read()
это абстрактный метод, которому не разрешено выдавать ничего, кромеIOException
согласно его API . Однако, цитируяSocket.setSoTimeout()
API:
Если для этой опции установлено ненулевое значение тайм-аута, вызов read() для входного потока, связанного с этим сокетом, будет блокироваться только на этот период времени. Если тайм-аут истечет,
java.net.SocketTimeoutException
поднимается, хотя сокет все еще действителен.
Буквально выбрасывает исключение, не объявленное в интерфейсе.
Эта проблема также подчеркивает тот факт, что интерфейс не предназначен для поддержки таймаутов. Java заставила сокеты возвращать значения, поддерживающие тайм-аут, нарушив принципы ООП. В заключение, если у вас есть компьютер, не основанный на TCPInputStream
и хотите иметь правильную блокировку тайм-аута, вам не повезло — это невозможно.
Вдохновленный этим ответом, я придумал более объектно-ориентированное решение.
Это действительно только в том случае, если вы собираетесь читать символы
Вы можете переопределить BufferedReader и реализовать что-то вроде этого:
public class SafeBufferedReader extends BufferedReader{
private long millisTimeout;
( . . . )
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return 0;
}
return super.read(cbuf, off, len);
}
protected void waitReady() throws IllegalThreadStateException, IOException {
if(ready()) return;
long timeout = System.currentTimeMillis() + millisTimeout;
while(System.currentTimeMillis() < timeout) {
if(ready()) return;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break; // Should restore flag
}
}
if(ready()) return; // Just in case.
throw new IllegalThreadStateException("Read timed out");
}
}
Вот почти полный пример.
Я возвращаю 0 для некоторых методов, вы должны изменить его на -2 в соответствии с вашими потребностями, но я думаю, что 0 больше подходит для контракта BufferedReader. Ничего плохого не произошло, просто прочитано 0 символов. Метод readLine - ужасный убийца производительности. Вам следует создать совершенно новый BufferedReader, если вы действительно хотите использовать readLine. Прямо сейчас это не потокобезопасно. Если кто-то вызывает операцию, пока readLines ожидает строки, это приведет к неожиданным результатам.
Не люблю возвращать -2 туда, где я. Я бы выбросил исключение, потому что некоторые люди могут просто проверять, если int < 0, чтобы рассмотреть EOS. В любом случае, эти методы заявляют, что "не могут блокировать", вы должны проверить, действительно ли это утверждение истинно, и просто не переопределять их.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.CharBuffer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
*
* readLine
*
* @author Dario
*
*/
public class SafeBufferedReader extends BufferedReader{
private long millisTimeout;
private long millisInterval = 100;
private int lookAheadLine;
public SafeBufferedReader(Reader in, int sz, long millisTimeout) {
super(in, sz);
this.millisTimeout = millisTimeout;
}
public SafeBufferedReader(Reader in, long millisTimeout) {
super(in);
this.millisTimeout = millisTimeout;
}
/**
* This is probably going to kill readLine performance. You should study BufferedReader and completly override the method.
*
* It should mark the position, then perform its normal operation in a nonblocking way, and if it reaches the timeout then reset position and throw IllegalThreadStateException
*
*/
@Override
public String readLine() throws IOException {
try {
waitReadyLine();
} catch(IllegalThreadStateException e) {
//return null; //Null usually means EOS here, so we can't.
throw e;
}
return super.readLine();
}
@Override
public int read() throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return -2; // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
}
return super.read();
}
@Override
public int read(char[] cbuf) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return -2; // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
}
return super.read(cbuf);
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return 0;
}
return super.read(cbuf, off, len);
}
@Override
public int read(CharBuffer target) throws IOException {
try {
waitReady();
} catch(IllegalThreadStateException e) {
return 0;
}
return super.read(target);
}
@Override
public void mark(int readAheadLimit) throws IOException {
super.mark(readAheadLimit);
}
@Override
public Stream<String> lines() {
return super.lines();
}
@Override
public void reset() throws IOException {
super.reset();
}
@Override
public long skip(long n) throws IOException {
return super.skip(n);
}
public long getMillisTimeout() {
return millisTimeout;
}
public void setMillisTimeout(long millisTimeout) {
this.millisTimeout = millisTimeout;
}
public void setTimeout(long timeout, TimeUnit unit) {
this.millisTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
}
public long getMillisInterval() {
return millisInterval;
}
public void setMillisInterval(long millisInterval) {
this.millisInterval = millisInterval;
}
public void setInterval(long time, TimeUnit unit) {
this.millisInterval = TimeUnit.MILLISECONDS.convert(time, unit);
}
/**
* This is actually forcing us to read the buffer twice in order to determine a line is actually ready.
*
* @throws IllegalThreadStateException
* @throws IOException
*/
protected void waitReadyLine() throws IllegalThreadStateException, IOException {
long timeout = System.currentTimeMillis() + millisTimeout;
waitReady();
super.mark(lookAheadLine);
try {
while(System.currentTimeMillis() < timeout) {
while(ready()) {
int charInt = super.read();
if(charInt==-1) return; // EOS reached
char character = (char) charInt;
if(character == '\n' || character == '\r' ) return;
}
try {
Thread.sleep(millisInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore flag
break;
}
}
} finally {
super.reset();
}
throw new IllegalThreadStateException("readLine timed out");
}
protected void waitReady() throws IllegalThreadStateException, IOException {
if(ready()) return;
long timeout = System.currentTimeMillis() + millisTimeout;
while(System.currentTimeMillis() < timeout) {
if(ready()) return;
try {
Thread.sleep(millisInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore flag
break;
}
}
if(ready()) return; // Just in case.
throw new IllegalThreadStateException("read timed out");
}
}