Число потоков занятых задач достигло предела
У меня есть приложение, которое обрабатывает необработанные данные и сохраняет их в базе данных. Для выполнения 1 потока требуется менее 100 мс. Приложение получает необработанные данные примерно с 200 устройств каждые 20 секунд, через некоторое время приложение зависает и становится недоступным. Когда мы посмотрели на количество потоков занятых задач в wildfly, мы обнаружили, что количество потоков занятых задач достигло предела в 128 потоков.
Мы пока не могли понять, почему это происходит, даже если время обработки намного меньше. Может ли кто-нибудь помочь в решении этого вопроса. Заранее спасибо.
@Controller
@EnableAsync
public class DataController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private IDataService dataService;
@Autowired
IDeviceService deviceService;
@Autowired
HttpServletResponse response;
@Autowired
HttpServletRequest request;
@Autowired
private IDeviceValidator deviceValidator;
@Autowired
private IJobService jobService;
@Autowired
private IDataAssembler dataAssembler;
@RequestMapping(value = { "/device_dat/auth/auth_tocken/imei/{imei_no}",
"/device_data/auth/auth_token/imei/{imei_no}" }, method = RequestMethod.POST, consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
@ResponseBody
public Object postDataText(@PathVariable(value = "imei_no") String imeiNo, @RequestBody String dataString) {
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::"
+ Thread.currentThread().getId() + "::postDataText::entry::"+ dataString+"::"+ sdf.format(startTimeMillis));
try {
Device device = (Device) deviceValidator.validateImeiNo(imeiNo);
if (device != null) {
DataVO dataVO = new DataVO();
dataVO.setImeiNo(imeiNo);
dataVO.setSerialNo(device.getSerialNo());
device.setLastUpdatedTime(new Date());
dataString = java.net.URLDecoder.decode(dataString, StandardCharsets.UTF_8.name());
dataVO.setSensorDataList(dataString);
List<Data> datas = dataAssembler.assembleData(dataVO, device);
String prefix = dataString.substring(0, 11);
deviceService.updateLastUpdatedData(datas, null, request.getSession().getId(), imeiNo, device);
dataService.updateDeviceDataAndInsertToDB(datas, request.getSession().getId(), imeiNo, device);
} else {
Map<String, String> data = new HashMap<String, String>();
data.put("message", "Device not Registered");
ResponseVO responseVO = new ResponseVO(ResponseTypeEnum.FAILURE.toString(), 400, data);
return responseVO;
}
} catch (Exception e) {
log.error("Error occured on postDataText method:" + e.getMessage() + "\ndataString was :" + dataString);
}
// response.addHeader("Baeldung-Example-Header", "Value-HttpServletResponse");
Long endTimeMillis = System.currentTimeMillis();
log.error(
"PERF::" + request.getSession().getId().toString() + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::postDataText::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
return null;
}
}
---------------------------------------------------------------------
@Service
@EnableAsync
public class DataServiceImpl implements IDataService {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
IDeviceService deviceService;
@Autowired
private DataDaoImpl dataDaoImpl;
@Autowired
private IDataRepository dataRepository;
@Autowired
private IApplicationService applicationService;
@Override
public Device updateDeviceDataAndInsertToDB(List<Data> datas, String prefix, String imeiNo, Device device) {
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateDeviceDataAndInsertToDB::entry::" + sdf.format(startTimeMillis));
try {
List<Data> upDatedList = new ArrayList<Data>();
datas.forEach(data -> {
if (data.getSourceDate() == null) {
data.setSourceDate(data.getCreatedDate());
}
deviceService.updateLastUpdatedTime(data.getSerialNo(),device);
data.setMetaData(device.getMetaData());
upDatedList.add(data);
});
dataRepository.insert(upDatedList);
} catch (Exception e) {
log.error("Method:save" + e.getMessage() + ", " + datas);
}
Long endTimeMillis = System.currentTimeMillis();
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateDeviceDataAndInsertToDB::exit::" + sdf.format(endTimeMillis) + "::"
+ (endTimeMillis - startTimeMillis));
return device;
}
}
-------------------------------------------------------------------------------------------------------
@Service
@EnableAsync
public class DeviceServiceImpl implements IDeviceService {
public final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
IDeviceDao deviceDao;
@Autowired
IDeviceRepository deviceRepository;
@Autowired
IApplicationService applicationService;
@Autowired
RestTemplate restTemplate;
@Autowired
IDataService dataService;
@Override
@Async
public void updateLastUpdatedData(List<Data> datas, Map<String, Object> metaData, String prefix, String imeiNo,
Device device) {
//sAsyncRestTemplate restTemplate = new AsyncRestTemplate();
/*
* WebClient client = WebClient.create(); Stream<Data> stringStream =
* datas.stream(); Flux<Data> fluxFromStream = Flux.fromStream(stringStream);
*/
Long startTimeMillis = System.currentTimeMillis();
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateLastUpdatedData::entry::" + sdf.format(startTimeMillis));
log.trace("Enter updateLastUpdatedData");
try {
Map<String, Object> response = new HashMap<String, Object>();
String applicationName = "gtest";
String urlString = "http://localhost:8080/" + applicationName + "/devices/";
String deviceSerialNo = device.getSerialNo();
String imei_no = imeiNo;
Map<String, Object> requestBody = new HashMap<String, Object>();
// For batch packet Device Follows LIFO
Data data = datas.get(0);
requestBody = (Map<String, Object>) data.getSensorDataList();
requestBody.put("source_date", data.getSourceDate());
if (metaData != null) {
requestBody.put("last_known_address", metaData.get("last_known_address"));
}
Map<String, Object> d = (Map<String, Object>) data.getSensorDataList();
Long count = 0L, emergencyCount = 0L;
for (Data datax : datas) {
Map<String, Object> x = (Map<String, Object>) datax.getSensorDataList();
try {
if (x.get("packet_type").toString().contains("Alert")) {
count = count + 1;
}
if (x.get("packet_type").toString().contains("Emergency button wire disconnect/wire-cut")) {
emergencyCount = emergencyCount + 1;
}
} catch (Exception e) {
// log.error("method:updateLastUpdatedData,"+e.getMessage()+"d"+d+"
// ,data:"+data);
}
}
requestBody.put("alert_count", count);
requestBody.put("emergency_alert_count", emergencyCount);
if (deviceSerialNo != null) {
urlString += deviceSerialNo;
restTemplate.postForObject(urlString, requestBody, Map.class);
}
} catch (Exception e) {
log.error("method:updateLastUpdatedData,"+e.getMessage()+", datas"+datas);
}
log.trace("Exit updateLastUpdatedData");
Long endTimeMillis = System.currentTimeMillis();
log.error("PERF::" + prefix + "::" + imeiNo + "::" + Thread.currentThread().getId()
+ "::updateLastUpdatedData::exit::" + sdf.format(endTimeMillis) + "::" + (endTimeMillis - startTimeMillis));
}
}
}
--------------------------------------------------------------------------------------------
@Repository
public class DataRepository implements IDataRepository {
public final Logger log = LoggerFactory.getLogger(this.getClass());
@Override
public void insert(List<Data> upDatedList) {
log.trace("Enter insert");
Query query = new Query();
query.maxTimeMsec(10);
MongoOperations mongoOperations = mongoTemplate;
mongoTemplate.insert(upDatedList, Data.class);
log.trace("Exit insert");
}
}