Неправильное понимание concurrentQueue, единственного потребителя, работающего из очереди в своем собственном потоке

У меня возникают проблемы при создании работающего SystemFileWatcher, который принимает созданное событие и сохраняет его в очереди для работы с отдельным потоком. Я прочитал бесчисленные темы здесь по этой проблеме, но я не могу разобраться в этой конкретной проблеме.

using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;

namespace FileSystemWatcherTest
{
    class Program
    {
        public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());

    static void Main(string[] args)
    {
        string path = @"C:\test\";
        FileSystemWatcher watcher = new FileSystemWatcher();

        watcher.Path = path;
        watcher.EnableRaisingEvents = true;
        watcher.Filter = "*.*";

        watcher.Created += new FileSystemEventHandler(onCreated);
        Thread Consumer = new Thread(new ThreadStart(mover));
        Consumer.Start();


        while (true) ;//run infinite loop so program doesn't terminate untill we force it.
    }
    static void onCreated(object sender, FileSystemEventArgs e)
    {
        processCollection.Add(e.FullPath);     
    }

    static void mover()
    {
        string current;
        string processed = @"C:\test\processed\";
        while (true)
        {
            while (processCollection.IsCompleted)
            {
                Thread.Sleep(1000);
            }
            while (processCollection.TryTake(out current))
            {
                System.IO.File.Move(current, processed);
            }
        }
    }
}

}

Это то, что я хотел бы проверить. Я знаю, что это не работает. Я убедился, что FSW работает, когда я просто пишу в консоль, когда файл помещается в очередь. Моя проблема начинается примерно тогда, когда я пытаюсь запустить функцию перемещения в ее собственном потоке. Функция перемещения и onCreated не отображаются для связи, как только я начинаю работать с очередью.

Я ожидаю от этого кода запуска функции перемещения в своем собственном потоке и запуска ее вместе с SFW. Я ожидаю, что параллельная очередь, присоединенная к автоматическим обновлениям blockingcollection (я ставлю элемент в очередь через onCreated, грузчик видит, что теперь у него +1 к этой очереди. Двигатель берет один из очереди, onCreated видит это.) возможно, использую Thread.Sleep неправильно. У меня больше нет поддерживающей причины для использования blockingcollection (которую я сначала выбрал для обработки ожидания заполнения очереди и, в основном, для постоянной проверки очереди на предмет для обработки), и я готов изменить это на все, что может работать. Я видел использование блокировок, но из того, что я понимаю, это на самом деле не нужно из-за того, как concurrentQueue синхронизируется.

Конечная цель заключается в обработке большого количества небольших файлов, которые поступают в случайное время и могут варьироваться от 1 до нескольких сотен в любой момент времени. Эти файлы являются.EML.

Если это вообще возможно, я был бы очень признателен за объяснение того, что происходит, и что было бы предложение обойти эту проблему. Я прихожу смиренно и ожидаю, что мне скажут все, что я понимаю, неправильно!

редактировать: я тестирую это как консольное приложение, но впоследствии оно будет использоваться как сервис. Я добавил время (правда); перед onCreated(), чтобы сохранить работу FSW.

1 ответ

Решение

У вас есть несколько разных проблем в вашем примере кода:

  1. Вы неправильно используете File.Move() метод. Для обоих параметров необходимо указать полное имя файла. Вы передаете имя каталога в качестве второго параметра, что неверно.
  2. Вы осматриваете IsCompleted свойство коллекции, как будто это было бы полезно. Это всегда будет falseи так, что блок кода ничего не делает. Это приводит к следующей проблеме...
  3. Ваш поток работает в узком цикле, потребляя огромное количество процессорного времени. Это может или не может вызвать ошибки, но это может...FileSystemWatcher На самом деле не гарантируется, что он всегда сообщает об изменениях, и одна из причин, по которой он может этого не делать, это отсутствие достаточного процессорного времени для мониторинга файловой системы. Если вы будете голодать, потратив все процессорное время, вы можете обнаружить, что он просто не сообщает об изменениях. Обратите внимание, что эта проблема существует и в вашем основном потоке; он также работает в тесном цикле, потребляя огромное количество процессорного времени, ничего не делая. Таким образом, вы полностью занимаете два ядра вашей системы.
  4. Вы не в состоянии воспользоваться моделью исполнения производителя / потребителя, которая BlockingCollection предназначен для. Ваш рабочий поток должен перечислять перечисление, возвращаемое GetConsumingEnumerable(), с использованием CompleteAdding() способ сообщить этому потоку, что больше нет работы.

Вот версия вашего примера кода, которая исправляет вышеперечисленные ошибки, а также немного очищает пример, чтобы сделать его более автономным:

// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();

static void Main(string[] args)
{
    string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");

    Console.WriteLine("Creating directory: \"{0}\"", testDirectory);
    Directory.CreateDirectory(testDirectory);

    FileSystemWatcher watcher = new FileSystemWatcher();

    watcher.Path = testDirectory;
    watcher.EnableRaisingEvents = true;
    watcher.Filter = "*.*";

    watcher.Created += new FileSystemEventHandler(onCreated);
    Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
    Consumer.Start(testDirectory);

    string text;

    while ((text = Console.ReadLine()) != "")
    {
        string newFile = Path.Combine(testDirectory, text + ".txt");

        File.WriteAllText(newFile, "Test file");
    }

    processCollection.CompleteAdding();
}

static void onCreated(object sender, FileSystemEventArgs e)
{
    if (e.ChangeType == WatcherChangeTypes.Created)
    {
        processCollection.Add(e.FullPath);
    }
}

static void mover(object testDirectory)
{
    string processed = Path.Combine((string)testDirectory, "processed");

    Console.WriteLine("Creating directory: \"{0}\"", processed);

    Directory.CreateDirectory(processed);

    foreach (string current in processCollection.GetConsumingEnumerable())
    {
        // Ensure that the file is in fact a file and not something else.
        if (File.Exists(current))
        {
            System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
        }
    }
}
Другие вопросы по тегам