Алгоритм N-way слияния
Двустороннее слияние широко изучается как часть алгоритма слияния. Но мне интересно узнать, как лучше всего выполнить N-way слияние?
Скажем, у меня есть N
файлы, которые отсортировали по 1 миллиону целых чисел каждый. Я должен объединить их в один файл, который будет иметь эти 100 миллионов отсортированных целых чисел.
Пожалуйста, имейте в виду, что сценарий использования этой проблемы - это внешняя сортировка, основанная на диске. Следовательно, в реальных сценариях также может быть ограничение памяти. Поэтому наивный подход к объединению 2 файлов за раз (99 раз) не сработает. Допустим, у нас есть только небольшое скользящее окно памяти, доступное для каждого массива.
Я не уверен, есть ли уже стандартизированное решение для этого N-way слияния. (Гугл мне мало что рассказал).
Но если вы знаете, хороший ли алгоритм n-way merge, пожалуйста, напишите algo/link.
Сложность времени: если мы значительно увеличим количество файлов (N
) Как это повлияет на временную сложность вашего алгоритма?
Спасибо за ваши ответы.
Меня нигде не задавали, но я чувствовал, что это может быть интересным вопросом для интервью. Поэтому помечены.
8 ответов
Как насчет следующей идеи:
- Создать приоритетную очередь
- Итерация по каждому файлу f
- поставить в очередь пару (nextNumberIn(f), f), используя первое значение в качестве ключа приоритета
- поставить в очередь пару (nextNumberIn(f), f), используя первое значение в качестве ключа приоритета
- Пока очередь не пустая
- глава очереди (m, f) очереди
- выход м
- если е не истощен
- enqueue (nextNumberIn(f), f)
Поскольку добавление элементов в приоритетную очередь может быть выполнено за логарифмическое время, элемент 2 равен O (N × log N). Поскольку (почти все) итерации цикла while добавляет элемент, весь цикл while равен O(M × log N), где M - общее количество чисел для сортировки.
Предполагая, что все файлы имеют непустую последовательность чисел, мы имеем M > N и, следовательно, весь алгоритм должен быть O(M × log N).
Ищите "Многофазное слияние", посмотрите классику - Donald Knuth & EHFriend.
Кроме того, вы можете взглянуть на предлагаемое объединение умных блоков Seyedafsari & Hasanzadeh, которое, как и в предыдущих предложениях, использует очереди с приоритетами.
Еще одно интересное обоснование - алгоритм слияния на месте, разработанный Kim & Kutzner.
Я также рекомендую эту статью Vitter: алгоритмы внешней памяти и структуры данных: работа с массивными данными.
Одна простая идея состоит в том, чтобы сохранить приоритетную очередь объединяемых диапазонов, сохраняя ее таким образом, чтобы диапазон с наименьшим первым элементом удалялся первым из очереди. Затем вы можете выполнить N-way слияние следующим образом:
- Вставьте все диапазоны в очередь приоритетов, кроме пустых диапазонов.
- Пока приоритетная очередь не пуста:
- Исключите самый маленький элемент из очереди.
- Добавьте первый элемент этого диапазона к выходной последовательности.
- Если это не пусто, вставьте остальную часть последовательности обратно в очередь с приоритетами.
Правильность этого алгоритма, по сути, является обобщением доказательства того, что двустороннее слияние работает корректно - если вы всегда добавляете наименьший элемент из любого диапазона, и все диапазоны сортируются, вы получаете последовательность в целом.
Сложность времени выполнения этого алгоритма может быть найдена следующим образом. Пусть M будет общим количеством элементов во всех последовательностях. Если мы используем двоичную кучу, то мы делаем не более O(M) вставок и O(M) удалений из очереди приоритетов, поскольку для каждого элемента, записанного в выходную последовательность, существует очередь для удаления наименьшей последовательности, за которой следует поставьте в очередь оставшуюся часть последовательности обратно в очередь. Каждый из этих шагов занимает O(lg N) операций, потому что вставка или удаление из двоичной кучи с N элементами в нем занимает O(lg N) времени. Это дает чистое время выполнения O(M lg N), которое растет менее чем линейно с количеством входных последовательностей.
Может быть способ получить это еще быстрее, но это кажется довольно хорошим решением. Использование памяти составляет O(N), потому что нам нужны O(N) служебные данные для двоичной кучи. Если мы реализуем двоичную кучу, сохраняя указатели на последовательности, а не на сами последовательности, это не должно быть слишком большой проблемой, если у вас нет действительно смешного количества последовательностей для объединения. В этом случае просто объедините их в группы, которые вписываются в память, а затем объедините все результаты.
Надеюсь это поможет!
Простой подход к объединению k отсортированных массивов (каждый длиной n) требует времени O(n k^2), а не времени O(nk). Так как когда вы объединяете первые 2 массива, это занимает 2n времени, затем, когда вы объединяете третий с выходом, это занимает 3n времени, так как теперь мы объединяем два массива длиной 2n и n. Теперь, когда мы объединяем эти выходные данные с четвертым, это слияние требует 4n времени. Таким образом, последнее слияние (когда мы добавляем k-й массив в наш уже отсортированный массив) требует k*n времени. Таким образом, общее требуемое время составляет 2n+ 3n + 4n +...k*n, который есть O(n k^2).
Похоже, что мы можем сделать это за O(kn), но это не так, потому что каждый раз, когда наш массив, который мы объединяем, увеличивается в размере.
Хотя мы можем достичь лучшего предела, используя разделяй и властвуй. Я все еще работаю над этим и опубликую решение, если найду его.
Я написал этот фрагмент кода в стиле STL, который выполняет N-way слияние, и подумал, что я опубликую его здесь, чтобы не дать другим изобретать велосипед заново.:)
Предупреждение: это только мягко проверено. Проверьте перед использованием. :)
Вы можете использовать это так:
#include <vector>
int main()
{
std::vector<std::vector<int> > v;
std::vector<std::vector<int>::iterator> vout;
std::vector<int> v1;
std::vector<int> v2;
v1.push_back(1);
v1.push_back(2);
v1.push_back(3);
v2.push_back(0);
v2.push_back(1);
v2.push_back(2);
v.push_back(v1);
v.push_back(v2);
multiway_merge(v.begin(), v.end(), std::back_inserter(vout), false);
}
Это также позволяет использовать пары итераторов вместо самих контейнеров.
Если вы используете Boost.Range, вы можете удалить часть стандартного кода.
Код:
#include <algorithm>
#include <functional> // std::less
#include <iterator>
#include <queue> // std::priority_queue
#include <utility> // std::pair
#include <vector>
template<class OutIt>
struct multiway_merge_value_insert_iterator : public std::iterator<
std::output_iterator_tag, OutIt, ptrdiff_t
>
{
OutIt it;
multiway_merge_value_insert_iterator(OutIt const it = OutIt())
: it(it) { }
multiway_merge_value_insert_iterator &operator++(int)
{ return *this; }
multiway_merge_value_insert_iterator &operator++()
{ return *this; }
multiway_merge_value_insert_iterator &operator *()
{ return *this; }
template<class It>
multiway_merge_value_insert_iterator &operator =(It const i)
{
*this->it = *i;
++this->it;
return *this;
}
};
template<class OutIt>
multiway_merge_value_insert_iterator<OutIt>
multiway_merge_value_inserter(OutIt const it)
{ return multiway_merge_value_insert_iterator<OutIt>(it); };
template<class Less>
struct multiway_merge_value_less : private Less
{
multiway_merge_value_less(Less const &less) : Less(less) { }
template<class It1, class It2>
bool operator()(
std::pair<It1, It1> const &b /* inverted */,
std::pair<It2, It2> const &a) const
{
return b.first != b.second && (
a.first == a.second ||
this->Less::operator()(*a.first, *b.first));
}
};
struct multiway_merge_default_less
{
template<class T>
bool operator()(T const &a, T const &b) const
{ return std::less<T>()(a, b); }
};
template<class R>
struct multiway_merge_range_iterator
{ typedef typename R::iterator type; };
template<class R>
struct multiway_merge_range_iterator<R const>
{ typedef typename R::const_iterator type; };
template<class It>
struct multiway_merge_range_iterator<std::pair<It, It> >
{ typedef It type; };
template<class R>
typename R::iterator multiway_merge_range_begin(R &r)
{ return r.begin(); }
template<class R>
typename R::iterator multiway_merge_range_end(R &r)
{ return r.end(); }
template<class R>
typename R::const_iterator multiway_merge_range_begin(R const &r)
{ return r.begin(); }
template<class R>
typename R::const_iterator multiway_merge_range_end(R const &r)
{ return r.end(); }
template<class It>
It multiway_merge_range_begin(std::pair<It, It> const &r)
{ return r.first; }
template<class It>
It multiway_merge_range_end(std::pair<It, It> const &r)
{ return r.second; }
template<class It, class OutIt, class Less, class PQ>
OutIt multiway_merge(
It begin, It const end, OutIt out, Less const &less,
PQ &pq, bool const distinct = false)
{
while (begin != end)
{
pq.push(typename PQ::value_type(
multiway_merge_range_begin(*begin),
multiway_merge_range_end(*begin)));
++begin;
}
while (!pq.empty())
{
typename PQ::value_type top = pq.top();
pq.pop();
if (top.first != top.second)
{
while (!pq.empty() && pq.top().first == pq.top().second)
{ pq.pop(); }
if (!distinct ||
pq.empty() ||
less(*pq.top().first, *top.first) ||
less(*top.first, *pq.top().first))
{
*out = top.first;
++out;
}
++top.first;
pq.push(top);
}
}
return out;
}
template<class It, class OutIt, class Less>
OutIt multiway_merge(
It const begin, It const end, OutIt out, Less const &less,
bool const distinct = false)
{
typedef typename multiway_merge_range_iterator<
typename std::iterator_traits<It>::value_type
>::type SubIt;
if (std::distance(begin, end) < 16)
{
typedef std::vector<std::pair<SubIt, SubIt> > Remaining;
Remaining remaining;
remaining.reserve(
static_cast<size_t>(std::distance(begin, end)));
for (It i = begin; i != end; ++i)
{
if (multiway_merge_range_begin(*i) !=
multiway_merge_range_end(*i))
{
remaining.push_back(std::make_pair(
multiway_merge_range_begin(*i),
multiway_merge_range_end(*i)));
}
}
while (!remaining.empty())
{
typename Remaining::iterator smallest =
remaining.begin();
for (typename Remaining::iterator
i = remaining.begin();
i != remaining.end();
)
{
if (less(*i->first, *smallest->first))
{
smallest = i;
++i;
}
else if (distinct && i != smallest &&
!less(
*smallest->first,
*i->first))
{
i = remaining.erase(i);
}
else { ++i; }
}
*out = smallest->first;
++out;
++smallest->first;
if (smallest->first == smallest->second)
{ smallest = remaining.erase(smallest); }
}
return out;
}
else
{
std::priority_queue<
std::pair<SubIt, SubIt>,
std::vector<std::pair<SubIt, SubIt> >,
multiway_merge_value_less<Less>
> q((multiway_merge_value_less<Less>(less)));
return multiway_merge(begin, end, out, less, q, distinct);
}
}
template<class It, class OutIt>
OutIt multiway_merge(
It const begin, It const end, OutIt const out,
bool const distinct = false)
{
return multiway_merge(
begin, end, out,
multiway_merge_default_less(), distinct);
}
Смотрите http://en.wikipedia.org/wiki/External_sorting. Вот мой взгляд на k-way merge на основе кучи, использующий буферизованное чтение из источников для эмуляции сокращения ввода / вывода:
public class KWayMerger<T>
{
private readonly IList<T[]> _sources;
private readonly int _bufferSize;
private readonly MinHeap<MergeValue<T>> _mergeHeap;
private readonly int[] _indices;
public KWayMerger(IList<T[]> sources, int bufferSize, Comparer<T> comparer = null)
{
if (sources == null) throw new ArgumentNullException("sources");
_sources = sources;
_bufferSize = bufferSize;
_mergeHeap = new MinHeap<MergeValue<T>>(
new MergeComparer<T>(comparer ?? Comparer<T>.Default));
_indices = new int[sources.Count];
}
public T[] Merge()
{
for (int i = 0; i <= _sources.Count - 1; i++)
AddToMergeHeap(i);
var merged = new T[_sources.Sum(s => s.Length)];
int mergeIndex = 0;
while (_mergeHeap.Count > 0)
{
var min = _mergeHeap.ExtractDominating();
merged[mergeIndex++] = min.Value;
if (min.Source != -1) //the last item of the source was extracted
AddToMergeHeap(min.Source);
}
return merged;
}
private void AddToMergeHeap(int sourceIndex)
{
var source = _sources[sourceIndex];
var start = _indices[sourceIndex];
var end = Math.Min(start + _bufferSize - 1, source.Length - 1);
if (start > source.Length - 1)
return; //we're done with this source
for (int i = start; i <= end - 1; i++)
_mergeHeap.Add(new MergeValue<T>(-1, source[i]));
//only the last item should trigger the next buffered read
_mergeHeap.Add(new MergeValue<T>(sourceIndex, source[end]));
_indices[sourceIndex] += _bufferSize; //we may have added less items,
//but if we did we've reached the end of the source so it doesn't matter
}
}
internal class MergeValue<T>
{
public int Source { get; private set; }
public T Value { get; private set; }
public MergeValue(int source, T value)
{
Value = value;
Source = source;
}
}
internal class MergeComparer<T> : IComparer<MergeValue<T>>
{
public Comparer<T> Comparer { get; private set; }
public MergeComparer(Comparer<T> comparer)
{
if (comparer == null) throw new ArgumentNullException("comparer");
Comparer = comparer;
}
public int Compare(MergeValue<T> x, MergeValue<T> y)
{
Debug.Assert(x != null && y != null);
return Comparer.Compare(x.Value, y.Value);
}
}
Вот одна из возможных реализацийMinHeap<T>
, Некоторые тесты:
[TestMethod]
public void TestKWaySort()
{
var rand = new Random();
for (int i = 0; i < 10; i++)
AssertKwayMerge(rand);
}
private static void AssertKwayMerge(Random rand)
{
var sources = new[]
{
GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
GenerateRandomCollection(rand, 10, 30, 0, 30).OrderBy(i => i).ToArray(),
};
Assert.IsTrue(new KWayMerger<int>(sources, 20).Merge().SequenceEqual(sources.SelectMany(s => s).OrderBy(i => i)));
}
public static IEnumerable<int> GenerateRandomCollection(Random rand, int minLength, int maxLength, int min = 0, int max = int.MaxValue)
{
return Enumerable.Repeat(0, rand.Next(minLength, maxLength)).Select(i => rand.Next(min, max));
}
Here is my implementation using MinHeap...
package merging;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
public class N_Way_Merge {
int No_of_files=0;
String[] listString;
int[] listIndex;
PrintWriter pw;
private String fileDir = "D:\\XMLParsing_Files\\Extracted_Data";
private File[] fileList;
private BufferedReader[] readers;
public static void main(String[] args) throws IOException {
N_Way_Merge nwm=new N_Way_Merge();
long start= System.currentTimeMillis();
try {
nwm.createFileList();
nwm.createReaders();
nwm.createMinHeap();
}
finally {
nwm.pw.flush();
nwm.pw.close();
for (BufferedReader readers : nwm.readers) {
readers.close();
}
}
long end = System.currentTimeMillis();
System.out.println("Files merged into a single file.\nTime taken: "+((end-start)/1000)+"secs");
}
public void createFileList() throws IOException {
//creates a list of sorted files present in a particular directory
File folder = new File(fileDir);
fileList = folder.listFiles();
No_of_files=fileList.length;
assign();
System.out.println("No. of files - "+ No_of_files);
}
public void assign() throws IOException
{
listString = new String[No_of_files];
listIndex = new int[No_of_files];
pw = new PrintWriter(new BufferedWriter(new FileWriter("D:\\XMLParsing_Files\\Final.txt", true)));
}
public void createReaders() throws IOException {
//creates array of BufferedReaders to read the files
readers = new BufferedReader[No_of_files];
for(int i=0;i<No_of_files;++i)
{
readers[i]=new BufferedReader(new FileReader(fileList[i]));
}
}
public void createMinHeap() throws IOException {
for(int i=0;i<No_of_files;i++)
{
listString[i]=readers[i].readLine();
listIndex[i]=i;
}
WriteToFile(listString,listIndex);
}
public void WriteToFile(String[] listString,int[] listIndex) throws IOException{
BuildHeap_forFirstTime(listString, listIndex);
while(!(listString[0].equals("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz")))
{
pw.println(listString[0]);
listString[0]=readers[listIndex[0]].readLine();
MinHeapify(listString,listIndex,0);
}
}
public void BuildHeap_forFirstTime(String[] listString,int[] listIndex){
for(int i=(No_of_files/2)-1;i>=0;--i)
MinHeapify(listString,listIndex,i);
}
public void MinHeapify(String[] listString,int[] listIndex,int index){
int left=index*2 + 1;
int right=left + 1;
int smallest=index;
int HeapSize=No_of_files;
if(left <= HeapSize-1 && listString[left]!=null && (listString[left].compareTo(listString[index])) < 0)
smallest = left;
if(right <= HeapSize-1 && listString[right]!=null && (listString[right].compareTo(listString[smallest])) < 0)
smallest=right;
if(smallest!=index)
{
String temp=listString[index];
listString[index]=listString[smallest];
listString[smallest]=temp;
listIndex[smallest]^=listIndex[index];
listIndex[index]^=listIndex[smallest];
listIndex[smallest]^=listIndex[index];
MinHeapify(listString,listIndex,smallest);
}
}
}
Java-реализация алгоритма min heap для объединения k отсортированных массивов:
public class MergeKSorted {
/**
* helper object to store min value of each array in a priority queue,
* the kth array and the index into kth array
*
*/
static class PQNode implements Comparable<PQNode>{
int value;
int kth = 0;
int indexKth = 0;
public PQNode(int value, int kth, int indexKth) {
this.value = value;
this.kth = kth;
this.indexKth = indexKth;
}
@Override
public int compareTo(PQNode o) {
if(o != null) {
return Integer.valueOf(value).compareTo(Integer.valueOf(o.value));
}
else return 0;
}
@Override
public String toString() {
return value+" "+kth+" "+indexKth;
}
}
public static void mergeKSorted(int[][] sortedArrays) {
int k = sortedArrays.length;
int resultCtr = 0;
int totalSize = 0;
PriorityQueue<PQNode> pq = new PriorityQueue<>();
for(int i=0; i<k; i++) {
int[] kthArray = sortedArrays[i];
totalSize+=kthArray.length;
if(kthArray.length > 0) {
PQNode temp = new PQNode(kthArray[0], i, 0);
pq.add(temp);
}
}
int[] result = new int[totalSize];
while(!pq.isEmpty()) {
PQNode temp = pq.poll();
int[] kthArray = sortedArrays[temp.kth];
result[resultCtr] = temp.value;
resultCtr++;
temp.indexKth++;
if(temp.indexKth < kthArray.length) {
temp = new PQNode(kthArray[temp.indexKth], temp.kth, temp.indexKth);
pq.add(temp);
}
}
print(result);
}
public static void print(int[] a) {
StringBuilder sb = new StringBuilder();
for(int v : a) {
sb.append(v).append(" ");
}
System.out.println(sb);
}
public static void main(String[] args) {
int[][] sortedA = {
{3,4,6,9},
{4,6,8,9,12},
{3,4,9},
{1,4,9}
};
mergeKSorted(sortedA);
}
}