Эффективная обработка памяти в блоке сообщений с использованием XMLHttpRequest
У меня есть XMLHttpRequest
с progress
Обработчик событий, который запрашивает разделенную на части страницу, которая непрерывно отправляет, добавляет фрагменты сообщения. Если я не установлю responseType
Я могу получить доступ к response
собственность XMLHttpRequest
в каждом progress
событие и обработать дополнительный блок сообщения. Проблема этого подхода заключается в том, что браузер должен хранить весь ответ в памяти, и в конечном итоге браузер будет аварийно завершать работу из-за этой потери памяти.
Итак, я попробовал responseType
из arraybuffer
в надежде, что я смогу нарезать буфер, чтобы предотвратить предыдущие чрезмерные потери памяти. К сожалению, progress
обработчик событий больше не способен читать response
собственность XMLHttpRequest
с этой точки зрения. Параметр события progress
Событие также не содержит буфера. Вот краткий, самостоятельный пример моей попытки этого (это написано для node.js
):
var http = require('http');
// -- The server.
http.createServer(function(req, res) {
if (req.url === '/stream') return serverStream(res);
serverMain(res);
}).listen(3000);
// -- The server functions to send a HTML page with the client code, or a stream.
function serverMain(res) {
res.writeHead(200, {'Content-Type': 'text/html'});
res.write('<html><body>Hello World</body><script>');
res.end(client.toString() + ';client();</script></html>');
}
function serverStream(res) {
res.writeHead(200, {'Content-Type': 'text/html'});
setInterval(function() {
res.write('Hello World<br />\n');
}, 1000);
}
// -- The client code which runs in the browser.
function client() {
var xhr = new XMLHttpRequest();
xhr.addEventListener('progress', function() {
if (!xhr.response) return console.log('progress without response :-(');
console.log('progress: ' + xhr.response.size);
}, false);
xhr.open('GET', '/stream', true);
xhr.responseType = 'arraybuffer';
xhr.send();
}
progress
Обработчик событий не имеет доступа к response
Я хотел. Как я могу обработать фрагменты сообщений в браузере эффективным способом памяти? Пожалуйста , не предлагайте WebSocket
, Я не хочу использовать один только для обработки потока сообщений только для чтения.
1 ответ
XMLHttpRequest
не кажется действительно разработанным для такого рода использования. Очевидным решением является опрос, который является популярным XMLHttpRequest
но я предполагаю, что вы не хотите пропустить данные из вашего потока, которые будут проскальзывать между вызовами.
На мой вопрос Can the "real" data chunks be identified in some way or is it basically random data ?
, ты ответил With some effort, the chunks could be identified by adding an event-id of sorts to the server-side
Исходя из этой предпосылки, я предлагаю:
Идея: сотрудничающие параллельные слушатели
- Подключитесь к потоку и настройте прослушиватель прогресса (называемый
listenerA()
). - Когда прибывает чанк, обработайте его и выведите. Сохраните ссылку на идентификаторы как первого, так и последнего чанка, полученного
listenerA()
, Посчитайте, сколько кусковlistenerA()
получил. - После
listenerA()
получил определенное количество чанков, породил другую "ветку" (соединение + слушатель,listenerB()
) делать шаги 1 и 2 параллельно с первым, но хранить обработанные данные в буфере, а не выводить их. - когда
listenerA()
получает чанк с тем же идентификатором, что и первый чанк, полученныйlistenerB()
отправить сигналlistenerB()
, сбрось первое соединение и убейlistenerA()
, - когда
listenerB()
получает сигнал завершения отlistenerA()
выведите буфер на вывод и продолжайте обработку в обычном режиме. - Есть
listenerB()
порождатьlistenerC()
на тех же условиях, что и раньше. - Продолжайте повторять с таким количеством соединений + слушателей, сколько необходимо.
Используя два перекрывающихся соединения, вы можете предотвратить возможную потерю фрагментов, которая может возникнуть в результате сброса одного соединения, а затем повторного соединения.
Заметки
- Это предполагает, что поток данных одинаков для всех соединений и не вводит некоторые индивидуальные настройки.
- В зависимости от скорости вывода потока и задержки соединения может быть заметен дамп буфера при переходе от одного соединения к другому.
- Вы также можете измерить общий размер ответа, а не количество блоков, чтобы решить, когда переключиться на новое соединение.
- Возможно, потребуется сохранить полный список идентификаторов чанков для сравнения, а не только с первым и последним, потому что мы не можем гарантировать время перекрытия.
-
responseType
изXMLHttpRequest
должен быть установлен в значение по умолчанию""
или "текст", чтобы вернуть текст. Другие типы данных не будут возвращать частичноеresponse
, Смотрите https://xhr.spec.whatwg.org/.
Тестовый сервер в node.js
Следующий код представляет собой сервер node.js, который выводит согласованный поток элементов для целей тестирования. Вы можете открыть несколько подключений к нему, выходные данные будут одинаковыми во всех сеансах, за вычетом возможной задержки сервера.
вернет данные, где id это увеличенное число
вернет данные, где id - это произвольная строка длиной 40 символов. Это предназначено для тестирования сценария, в котором нельзя использовать идентификатор для упорядочения данных.
var crypto = require('crypto');
// init + update nodeId
var nodeId = 0;
var nodeIdRand = '0000000000000000000000000000000000000000';
setInterval(function() {
// regular id
++nodeId;
//random id
nodeIdRand = crypto.createHash('sha1').update(nodeId.toString()).digest('hex');
}, 1000);
// create server (port 5500)
var http = require('http');
http.createServer(function(req, res) {
if(req.url === '/stream') {
return serverStream(res);
}
else if(req.url === '/streamRandom') {
return serverStream(res, true);
}
}).listen(5500);
// serve nodeId
function serverStream(res, rand) {
// headers
res.writeHead(200, {
'Content-Type' : 'text/plain',
'Access-Control-Allow-Origin' : '*',
});
// remember last served id
var last = null;
// output interval
setInterval(function() {
// output on new node
if(last != nodeId) {
res.write('[node id="'+(rand ? nodeIdRand : nodeId)+'"]');
last = nodeId;
}
}, 250);
}
Подтверждение концепции с использованием вышеупомянутого серверного кода node.js
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
</head>
<body>
<button id="stop">stop</button>
<div id="output"></div>
<script>
/*
Listening to a never ending page load (http stream) without running out of
memory by using concurrent overlapping connections to prevent loss of data,
using only xmlHttpRequest, under the condition that the data can be identified.
listen arguments
url url of the http stream
chunkMax number of chunks to receive before switching to new connection
listen properties
output a reference to a DOM element with id "output"
queue an array filled with non-duplicate received chunks and metadata
lastFetcherId an incrementing number used to assign an id to new fetchers
fetchers an array listing all active fetchers
listen methods
fire internal use fire an event
stop external use stop all connections
fetch internal use starts a new connection
fetchRun internal use initialize a new fetcher object
Usage
var myListen = new listen('http://localhost:5500/streamRandom', 20);
will listen to url "http://localhost:5500/streamRandom"
will switch connections every 20 chunks
myListen.stop()
will stop all connections in myListen
*/
function listen(url, chunkMax) {
// main ref
var that = this;
// output element
that.output = document.getElementById('output');
// main queue
that.queue = [];
// last fetcher id
that.lastFetcherId = 0;
// list of fetchers
that.fetchers = [];
//********************************************************* event dispatcher
that.fire = function(name, data) {
document.dispatchEvent(new CustomEvent(name, {'detail':data}));
}
//******************************************************** kill all fetchers
that.stop = function() {
that.fire('fetch-kill', -1);
}
//************************************************************** url fetcher
that.fetch = function(fetchId, url, fetchRef) {
//console.log('start fetcher #'+fetchId);
var len = 0;
var xhr = new XMLHttpRequest();
var cb_progress;
var cb_kill;
// progress listener
xhr.addEventListener('progress', cb_progress = function(e) {
// extract chunk data
var chunkData = xhr.response.substr(len);
// chunk id
var chunkId = chunkData.match(/id="([a-z0-9]+)"/)[1];
// update response end point
len = xhr.response.length;
// signal end of chunk processing
that.fire('chunk-ready', {
'fetchId' : fetchId,
'fetchRef' : fetchRef,
'chunkId' : chunkId,
'chunkData' : chunkData,
});
}, false);
// kill switch
document.addEventListener('fetch-kill', cb_kill = function(e) {
// kill this fetcher or all fetchers (-1)
if(e.detail == fetchId || e.detail == -1) {
//console.log('kill fetcher #'+fetchId);
xhr.removeEventListener('progress', cb_progress);
document.removeEventListener('fetch-kill', cb_kill);
xhr.abort();
that.fetchers.shift(); // remove oldest fetcher from list
xhr = null;
delete xhr;
}
}, false);
// go
xhr.open('GET', url, true);
xhr.responseType = 'text';
xhr.send();
};
//****************************************************** start a new fetcher
that.fetchRun = function() {
// new id
var id = ++that.lastFetcherId;
//console.log('create fetcher #'+id);
// create fetcher with new id
var fetchRef = {
'id' : id, // self id
'queue' : [], // internal queue
'chunksIds' : [], // retrieved ids, also used to count
'hasSuccessor' : false, // keep track of next fetcher spawn
'ignoreId' : null, // when set, ignore chunks until this id is received (this id included)
};
that.fetchers.push(fetchRef);
// run fetcher
that.fetch(id, url, fetchRef);
};
//************************************************ a fetcher returns a chunk
document.addEventListener('chunk-ready', function(e) {
// shorthand
var f = e.detail;
// ignore flag is not set, process chunk
if(f.fetchRef.ignoreId == null) {
// store chunk id
f.fetchRef.chunksIds.push(f.chunkId);
// create queue item
var queueItem = {'id':f.chunkId, 'data':f.chunkData};
// chunk is received from oldest fetcher
if(f.fetchId == that.fetchers[0].id) {
// send to main queue
that.queue.push(queueItem);
// signal queue insertion
that.fire('queue-new');
}
// not oldest fetcher
else {
// use fetcher internal queue
f.fetchRef.queue.push(queueItem);
}
}
// ignore flag is set, current chunk id the one to ignore
else if(f.fetchRef.ignoreId == f.chunkId) {
// disable ignore flag
f.fetchRef.ignoreId = null;
}
//******************** check chunks count for fetcher, threshold reached
if(f.fetchRef.chunksIds.length >= chunkMax && !f.fetchRef.hasSuccessor) {
// remember the spawn
f.fetchRef.hasSuccessor = true;
// spawn new fetcher
that.fetchRun();
}
/***********************************************************************
check if the first chunk of the second oldest fetcher exists in the
oldest fetcher.
If true, then they overlap and we can kill the oldest fetcher
***********************************************************************/
if(
// is this the oldest fetcher ?
f.fetchId == that.fetchers[0].id
// is there a successor ?
&& that.fetchers[1]
// has oldest fetcher received the first chunk of its successor ?
&& that.fetchers[0].chunksIds.indexOf(
that.fetchers[1].chunksIds[0]
) > -1
) {
// get index of last chunk of the oldest fetcher within successor queue
var lastChunkId = that.fetchers[0].chunksIds[that.fetchers[0].chunksIds.length-1]
var lastChunkIndex = that.fetchers[1].chunksIds.indexOf(lastChunkId);
// successor has not reached its parent last chunk
if(lastChunkIndex < 0) {
// discard whole queue
that.fetchers[1].queue = [];
that.fetchers[1].chunksIds = [];
// set ignore id in successor to future discard duplicates
that.fetchers[1].ignoreId = lastChunkId;
}
// there is overlap
else {
/**
console.log('triming queue start: '+that.fetchers[1].queue.length
+" "+(lastChunkIndex+1)
+" "+(that.fetchers[1].queue.length-1)
);
/**/
var trimStart = lastChunkIndex+1;
var trimEnd = that.fetchers[1].queue.length-1;
// trim queue
that.fetchers[1].queue = that.fetchers[1].queue.splice(trimStart, trimEnd);
that.fetchers[1].chunksIds = that.fetchers[1].chunksIds.splice(trimStart, trimEnd);
//console.log('triming queue end: '+that.fetchers[1].queue.length);
}
// kill oldest fetcher
that.fire('fetch-kill', that.fetchers[0].id);
}
}, false);
//***************************************************** main queue processor
document.addEventListener('queue-new', function(e) {
// process chunks in queue
while(that.queue.length > 0) {
// get chunk and remove from queue
var chunk = that.queue.shift();
// output item to document
if(that.output) {
that.output.innerHTML += "<br />"+chunk.data;
}
}
}, false);
//****************************************************** start first fetcher
that.fetchRun();
};
// run
var process = new listen('http://localhost:5500/streamRandom', 20);
// bind global kill switch to button
document.getElementById('stop').addEventListener('click', process.stop, false);
</script>
</body>
</html>