Самый быстрый подход к вставке больших коллекций данных в Cassandra в C#
Я немного озадачен самым быстрым способом добавления больших коллекций в базу данных кассандры. Я прочитал, что я не должен использовать пакетную вставку, потому что она создана для атомарности. Даже Кассандра дает мне информацию, чтобы использовать асинхронные записи для исполнения. Я использовал код для самой быстрой вставки без ключевого слова batch:
var cluster = Cluster.Builder()
.AddContactPoint(“127.0.0.1")
.Build();
var session = cluster.Connect();
//Save off the prepared statement you’re going to use
var statement = session.Prepare (“INSERT INTO tester.users (userID, firstName, lastName) VALUES (?,?,?)”);
var tasks = new List<Task>();
for (int i = 0; i < 1000; i++)
{
//please bind with whatever actually useful data you’re importing
var bind = statement.Bind (i, “John”, “Tester”);
var resultSetFuture = session.ExecuteAsync (bind);
tasks.Add (resultSetFuture);
}
Task.WaitAll(tasks.ToArray());
cluster.Shutdown();
от: https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e
Но это все еще намного медленнее, чем пакетный вариант, который я использую. Мой текущий код выглядит так:
IList<Movie> moviesList = Movie.CreateMoviesCollectionForCassandra(collectionEntriesNumber);
var preparedStatements = new List<PreparedStatement>();
foreach (var statement in preparedStatements)
{
statement.SetConsistencyLevel(ConsistencyLevel.One);
}
var statementBinding = new BatchStatement();
statementBinding.SetBatchType(BatchType.Unlogged);
for (int i = 0; i < collectionEntriesNumber; i++)
{
preparedStatements.Add(Session.Prepare("INSERT INTO Movies (id, title, description, year, genres, rating, originallanguage, productioncountry, votingsnumber, director) VALUES (?,?,?,?,?,?,?,?,?,?)"));
}
for (int i = 0; i < collectionEntriesNumber; i++)
{
statementBinding.Add(preparedStatements[i].Bind(moviesList[i].Id, moviesList[i].Title,
moviesList[i].Description, moviesList[i].Year, moviesList[i].Genres, moviesList[i].Rating,
moviesList[i].OriginalLanguage, moviesList[i].ProductionCountry, moviesList[i].VotingsNumber,
new Director(moviesList[0].Director.Id, moviesList[i].Director.Firstname,
moviesList[i].Director.Lastname, moviesList[i].Director.Age)));
}
watch.Start();
Session.ExecuteAsync(statementBinding);
watch.Stop();
Это действительно работает намного быстрее, но я могу вставить только ~2500 подготовленных операторов, не более, и я хочу измерить время вставки около 100000 объектов.
Мой код правильный? Может быть, я просто должен увеличить порог вставки? Пожалуйста, объясните мне, как это сделать правильно.
1 ответ
Помните, что вы должны подготовить один раз и использовать то же самое PreparedStatement
привязать к разным параметрам.
Вы можете использовать пакеты небольшого размера, если вы нацеливаетесь на один и тот же раздел, если нет, то вы должны использовать индивидуальные запросы.
При использовании отдельных запросов вы можете планировать параллельное выполнение и ограничивать количество невыполненных запросов с помощью семафора.
Что-то вроде:
public async Task<long> Execute(
IStatement[] statements, int parallelism, int maxOutstandingRequests)
{
var semaphore = new SemaphoreSlim(maxOutstandingRequests);
var tasks = new Task<RowSet>[statements.Length];
var chunkSize = statements.Length / parallelism;
if (chunkSize == 0)
{
chunkSize = 1;
}
var statementLength = statements.Length;
var launchTasks = new Task[parallelism + 1];
var watch = new Stopwatch();
watch.Start();
for (var i = 0; i < parallelism + 1; i++)
{
var startIndex = i * chunkSize;
//start to launch in parallel
launchTasks[i] = Task.Run(async () =>
{
for (var j = 0; j < chunkSize; j++)
{
var index = startIndex + j;
if (index >= statementLength)
{
break;
}
await semaphore.WaitAsync();
var t = _session.ExecuteAsync(statements[index]);
tasks[index] = t;
var rs = await t;
semaphore.Release();
}
});
}
await Task.WhenAll(launchTasks);
await Task.WhenAll(tasks);
watch.Stop();
return watch.ElapsedMilliseconds;
}