Java многопоточное приложение: thread.sleep(100) занимает почти всегда
Я пытаюсь исправить приложение, которое отправляет клиентам несколько писем. Отправка выполняется с помощью многопоточного Java-приложения (по модели Producer-Consumer), где Producer вызывает список сообщений из базы данных, а потребители вызывают скрипт Python для отправки почты.
Один день за другим мой босс говорил мне, что программа перестала работать - я не знаю, что случилось, возможно, что-то изменилось, но системные администраторы продолжают говорить, что ничего не произошло, программа работала вчера, а затем внезапно замедлилась... и теперь он отправляет только 2-3 сообщения в час вместо прежнего числа 1000.
Разработчик больше не доступен, поэтому я должен исправить это сам.
Я очистил базу данных, она содержала только данные о "прошлом", она не была нужна и была огромной... 6 ГБ данных, 7 миллионов строк, это было очень медленно. Но проблема осталась.
Теперь я нахожусь на экране с приложением Java, и я вижу, что эти строки:
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
Thread.sleep(consumerSleepTime);
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
"остаться" во сне, и я вижу:
11:24:15:: consumer_thread_1 спит в течение 100 мс::
Больше ничего не происходит Никаких "перерывов в спящем режиме", никаких стековых данных, похоже, что спящий поток длится вечно... Однако, если я продолжу процесс, несколько раз, после случайного времени, потоки продолжатся, а затем снова спят... в течение нескольких часов.
Есть идеи?
Вот несколько файлов, может быть, они нужны для понимания всей структуры...
runjava
#!/bin/sh
java -cp /usr/share/java/mysql-connector-java.jar:. newthread
newthread.java
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.sql.SQLException;
import java.sql.Driver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.io.Closeable;
import java.io.IOException;
class ReentrantLockTest
{
int maxSize = 100;
ArrayList boundedBuffer = new ArrayList( maxSize );
ReentrantLock lock = new ReentrantLock(false);
Condition telire_var = lock.newCondition(),
uresre_var = lock.newCondition();
volatile boolean shutdown = false;
long producerSleepTime = 5000;
long consumerSleepTime = 100;
long mainDriverWaitTime = 1000;
int totalProduerThreads = 1;
int totalConsumerThreads = 2;
long criticalSectionDelay = 0;
int currentIndex = -1;
String sqlhost = "localhost";
String sqluser = "user";
String sqlpassword = "pass";
String sqldb = "db";
String mysqlUrl = "";
public ReentrantLockTest()
{
System.out.println(sqluser);
java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
ArrayList<Thread> threads = new ArrayList<Thread>();
ThreadGroup tg = new ThreadGroup("ReentrantLockTest Thread Group");
// get sql
mysqlUrl = "jdbc:mysql://"+ sqlhost +"/"+ sqldb +"?autoReconnect=true";
try
{
Class.forName("com.mysql.jdbc.Driver").newInstance();
}
catch (Exception e)
{
System.out.println(e);
return;
}
for (int i = 1; i<=totalProduerThreads; i++)
{
threads.add( new Thread( tg , new ReentrantLockTest.Producer() , "producer_thread_"+i) );
}
for (int i = 1; i<=totalConsumerThreads; i++)
{
threads.add( new Thread( tg , new ReentrantLockTest.Consumer() , "consumer_thread_"+i) );
}
for (Thread t: threads)
t.start();
try
{
while (true)
{
for (Thread t: threads)
t.resume();
Thread.sleep(mainDriverWaitTime);
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
shutdown = true;
System.out.println(shortTime.format(new java.util.Date()) + ":: ReentrantLockTest - setting shutdown to false ::");
}
System.out.println(":: ReentrantLockTest - signalling interrupt and waiting for "+tg.activeCount()+" threads to die ::");
for (Thread t: threads)
{
System.out.println(shortTime.format(new java.util.Date()) + ":: Interrupting "+t.getName()+" ::");
try
{
t.interrupt();
}
catch (Exception e)
{
e.printStackTrace();
}
}
for (Thread t: threads)
{
try
{
StringBuilder sb = new StringBuilder( t.getName() );
System.out.println(shortTime.format(new java.util.Date()) + ":: Waiting for "+sb+" to die ::");
t.join();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+sb+" is dead ::" );
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
public class Consumer implements Runnable
{
java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
private void close(Closeable c)
{
if (c != null)
{
try
{
c.close();
}
catch (IOException e)
{
// ignored
}
}
}
public void run()
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
{
try
{
lock.lock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");
Thread.sleep(criticalSectionDelay);
if (currentIndex == -1)
{
uresre_var.signal();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_full_buffer ::");
}
while(currentIndex == -1)
{
System.out.println(shortTime.format(new java.util.Date()) + ":: buffer is empty - "+Thread.currentThread().getName()+" is going to wait ::");
telire_var.await();
}//while condition waiting_on_empty_buffer
// sendmail
String mail_id = boundedBuffer.remove(currentIndex).toString();
currentIndex--;
Process proc = null;
System.out.println(shortTime.format(new java.util.Date()) + ":: buffer size: "+ currentIndex);
lock.unlock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" starts python ("+ mail_id +")");
//this randomwaiting on sending is not needed
/*Random generator = new Random();
int randomWait = generator.nextInt(1000) + 100;
Thread.sleep(randomWait);
System.out.println(" => "+ randomWait);*/
proc = Runtime.getRuntime().exec("python ./new_mail.py "+ mail_id);
proc.waitFor();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" ends python");
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
if (proc != null)
{
close(proc.getOutputStream());
close(proc.getInputStream());
close(proc.getErrorStream());
proc.destroy();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName() +" close python proc");
}
}
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+consumerSleepTime+" ms ::");
Thread.sleep(consumerSleepTime);
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over in try block ::");
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" sleeptime over ::");
}//end while: main_while_loop
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
}//end run
}
public class Producer implements Runnable
{
java.text.DateFormat shortTime = java.text.DateFormat.getTimeInstance(java.text.DateFormat.MEDIUM);
public void run()
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been started ::");
main_while_loop: while( shutdown == false && !Thread.currentThread().isInterrupted() )
{
try
{
lock.lock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" acquired the lock ::");
while (currentIndex != -1)
{
System.out.println(shortTime.format(new java.util.Date()) + ":: a bufferben van valamennyi - "+Thread.currentThread().getName()+" varakozik ::");
uresre_var.await();
}//while condition waiting_on_full_buffer
Thread.sleep(criticalSectionDelay);
Connection con = null;
Statement st = null;
ResultSet rs = null;
try
{
con = DriverManager.getConnection(mysqlUrl, sqluser, sqlpassword);
st = con.createStatement();
rs = st.executeQuery("SELECT mail_id FROM mail_queue WHERE status = 0 OR (status BETWEEN 2 AND 100 AND proof_counter <= 3 AND UNIX_TIMESTAMP() - proof_stamp > 15*60 ) ORDER BY status, mail_id DESC LIMIT "+ maxSize);
boundedBuffer = new ArrayList(maxSize);
currentIndex = -1;
while(rs.next())
{
String mail_id = Integer.toString(rs.getInt(1));
boundedBuffer.add(mail_id);
currentIndex++;
}
con.close();
}
catch (Exception err)
{
System.out.println(err);
break;
}
telire_var.signal();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" signalling waiting_on_empty_buffer ::");
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
finally
{
lock.unlock();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" released the lock ::");
}
try
{
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" is sleeping for "+producerSleepTime+" ms ::");
Thread.sleep(producerSleepTime);
}
catch (InterruptedException e)
{
e.printStackTrace();
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" was interrupted ::");
Thread.currentThread().interrupt();
break main_while_loop;
}
}
System.out.println(shortTime.format(new java.util.Date()) + ":: "+Thread.currentThread().getName()+" has been shutdown ::");
}
}
}
class newthread
{
public static void main(String[] args)
{
new ReentrantLockTest();
}
}
Любая помощь приветствуется!
Редактировать:
Вот дамп потока:
2012-07-05 12:15:06
Full thread dump OpenJDK 64-Bit Server VM (14.0-b16 mixed mode):
"SIGHUP handler" daemon prio=10 tid=0x00007f7bb0001000 nid=0x838 runnable [0x00007f7beb205000]
java.lang.Thread.State: RUNNABLE
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:636)
"consumer_thread_2" prio=10 tid=0x00007f7d940f4800 nid=0x68c runnable [0x00007f7beb51b000]
java.lang.Thread.State: RUNNABLE
at java.io.PrintStream.write(PrintStream.java:446)
- locked <0x00007f7d137703b0> (a java.io.PrintStream)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:220)
at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:290)
at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:103)
- locked <0x00007f7d137704a8> (a java.io.OutputStreamWriter)
at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185)
at java.io.PrintStream.write(PrintStream.java:494)
- locked <0x00007f7d137703b0> (a java.io.PrintStream)
at java.io.PrintStream.print(PrintStream.java:636)
at java.io.PrintStream.println(PrintStream.java:773)
- locked <0x00007f7d137703b0> (a java.io.PrintStream)
at ReentrantLockTest$Consumer.run(newthread.java:151)
at java.lang.Thread.run(Thread.java:636)
"consumer_thread_1" prio=10 tid=0x00007f7d940f4000 nid=0x68b waiting on condition [0x00007f7beb61c000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f7d137acee0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:871)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201)
at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at ReentrantLockTest$Consumer.run(newthread.java:150)
at java.lang.Thread.run(Thread.java:636)
"producer_thread_1" prio=10 tid=0x00007f7d940e5000 nid=0x68a waiting on condition [0x00007f7beb71d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007f7d137acf38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at ReentrantLockTest$Producer.run(newthread.java:250)
at java.lang.Thread.run(Thread.java:636)
"Low Memory Detector" daemon prio=10 tid=0x00007f7d940af800 nid=0x688 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"CompilerThread1" daemon prio=10 tid=0x00007f7d940ac800 nid=0x687 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"CompilerThread0" daemon prio=10 tid=0x00007f7d940aa800 nid=0x686 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" daemon prio=10 tid=0x00007f7d940a8800 nid=0x685 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" daemon prio=10 tid=0x00007f7d94087800 nid=0x683 in Object.wait() [0x00007f7bebd36000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:133)
- locked <0x00007f7d13761210> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:149)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:177)
"Reference Handler" daemon prio=10 tid=0x00007f7d94085800 nid=0x682 in Object.wait() [0x00007f7bebe37000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
- locked <0x00007f7d13761078> (a java.lang.ref.Reference$Lock)
"main" prio=10 tid=0x00007f7d94007000 nid=0x673 sleeping[0x00007f7d99285000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at ReentrantLockTest.<init>(newthread.java:81)
at newthread.main(newthread.java:320)
"VM Thread" prio=10 tid=0x00007f7d94080800 nid=0x681 runnable
"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f7d94011800 nid=0x674 runnable
"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f7d94013000 nid=0x675 runnable
"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f7d94015000 nid=0x676 runnable
"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f7d94017000 nid=0x677 runnable
"GC task thread#4 (ParallelGC)" prio=10 tid=0x00007f7d94018800 nid=0x678 runnable
"GC task thread#5 (ParallelGC)" prio=10 tid=0x00007f7d9401a800 nid=0x679 runnable
"GC task thread#6 (ParallelGC)" prio=10 tid=0x00007f7d9401c800 nid=0x67a runnable
"GC task thread#7 (ParallelGC)" prio=10 tid=0x00007f7d9401e000 nid=0x67b runnable
"GC task thread#8 (ParallelGC)" prio=10 tid=0x00007f7d94020000 nid=0x67c runnable
"GC task thread#9 (ParallelGC)" prio=10 tid=0x00007f7d94022000 nid=0x67d runnable
"GC task thread#10 (ParallelGC)" prio=10 tid=0x00007f7d94023800 nid=0x67e runnable
"GC task thread#11 (ParallelGC)" prio=10 tid=0x00007f7d94025800 nid=0x67f runnable
"GC task thread#12 (ParallelGC)" prio=10 tid=0x00007f7d94027800 nid=0x680 runnable
"VM Periodic Task Thread" prio=10 tid=0x00007f7d940b2000 nid=0x689 waiting on condition
JNI global references: 1000
Heap
12:15:06:: consumer_thread_2 acquired the lock :: PSYoungGen total 112448K, used 17376K [0x00007f7d13760000, 0x00007f7d1b4e0000, 0x00007f7d91000000)
eden space 96384K, 18% used [0x00007f7d13760000,0x00007f7d148581c0,0x00007f7d19580000)
from space 16064K, 0% used [0x00007f7d1a530000,0x00007f7d1a530000,0x00007f7d1b4e0000)
to space 16064K, 0% used
[0x00007f7d19580000,0x00007f7d19580000,0x00007f7d1a530000)
PSOldGen total 257152K, used 0K [0x00007f7c18600000, 0x00007f7c28120000, 0x00007f7d13760000)
object space 257152K, 0% used [0x00007f7c18600000,0x00007f7c18600000,0x00007f7c28120000)
PSPermGen total 21248K, used 7359K [0x00007f7c0de00000, 0x00007f7c0f2c0000, 0x00007f7c18600000)
object space 21248K, 34% used [0x00007f7c0de00000,0x00007f7c0e52fe38,0x00007f7c0f2c0000)
1 ответ
(скопировано из комментариев)
Это поведение началось в прошлые выходные? Может ли это иметь какое-то отношение к високосным проблемам, с которыми сталкивалось множество людей? (Симптом: очень высокая загрузка ЦП Java-процесса, что может привести к тому, что спящие потоки занимают намного больше времени, чем обычно)
Если это действительно проблема, одно из решений - просто перезагрузить систему. Другое решение - запустить:
date -s "`date`"
как предложено в этой статье.