Как посмотреть файл на предмет изменений?

У меня есть файл журнала, записываемый другим процессом, который я хочу наблюдать за изменениями. Каждый раз, когда происходит изменение, я хотел бы прочитать новые данные, чтобы выполнить их обработку.

Какой лучший способ сделать это? Я надеялся, что из библиотеки PyWin32 будет какая-то зацепка. Я нашел win32file.FindNextChangeNotification функция, но не знаю, как попросить его посмотреть конкретный файл.

Если кто-нибудь сделал что-то подобное, я был бы очень рад услышать, как...

[Редактировать] Я должен был упомянуть, что я был за решение, которое не требует опроса.

[Править] Проклятия! Кажется, это не работает на подключенном сетевом диске. Я предполагаю, что Windows не "слышит" никаких обновлений файла, как это происходит на локальном диске.

31 ответ

Решение

Вы уже просмотрели документацию, доступную по адресу http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html? Если вам нужно, чтобы он работал только под Windows, 2-й пример выглядит именно так, как вам нужно (если вы поменяете путь к каталогу с тем файлом, который вы хотите просмотреть).

В противном случае опрос, вероятно, будет единственным действительно независимым от платформы вариантом.

Примечание: я не пробовал ни одно из этих решений.

Вы пытались использовать Watchdog?

Библиотека Python API и утилиты оболочки для мониторинга событий файловой системы.

Мониторинг каталогов стал проще с

  • Кроссплатформенный API.
  • Инструмент оболочки для запуска команд в ответ на изменения каталога.

Начните быстро с простого примера в Quickstart...

Если опрос достаточно хорош для вас, я просто посмотрю, не изменится ли статистика файла "измененное время". Чтобы прочитать это:

os.stat(filename).st_mtime

(Также обратите внимание, что решение событий собственных изменений Windows не работает при любых обстоятельствах, например, на сетевых дисках.)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...

Если вы хотите мультиплатформенное решение, тогда проверьте QFileSystemWatcher. Вот пример кода (не санированный):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)

Он не должен работать на Windows (может быть, с Cygwin?), Но для пользователя Unix, вы должны использовать системный вызов "fcntl". Вот пример в Python. Это в основном тот же код, если вам нужно написать его на C (те же имена функций)

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)

Проверьте pyinotify.

inotify заменяет dnotify (из более раннего ответа) в новых linux и позволяет осуществлять мониторинг на уровне файлов, а не на уровне каталогов.

Для просмотра отдельного файла с опросом и минимальными зависимостями, вот полностью выделенный пример, основанный на ответе Дистана (выше):

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going

Ну, после небольшого взлома сценария Тима Голдена, у меня есть следующее, которое, кажется, работает довольно хорошо:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

Вероятно, это можно сделать с дополнительной проверкой ошибок, но для простого просмотра файла журнала и некоторой обработки его перед выпуском на экран это работает хорошо.

Спасибо всем за ваш вклад - отличные вещи!

Это еще одна модификация скрипта Тима Голдана, которая работает на Linux и добавляет простой наблюдатель для модификации файла с использованием dict (file=>time).

использование: whatName.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print "Watching ", path_to_watch

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print "Added: ", ", ".join(added)
        if removed: print "Removed: ", ", ".join(removed)
        if modified: print "Modified ", ", ".join(modified)

        before = after

Проверьте мой ответ на аналогичный вопрос. Вы можете попробовать тот же цикл в Python. Эта страница предлагает:

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

Также смотрите вопрос tail() в файле с Python.

Самое простое решение для меня - использовать инструмент watchdo от watchdog

Из https://pypi.python.org/pypi/watchdog меня теперь есть процесс, который ищет файлы sql в каталоге и выполняет их при необходимости.

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.

Вот упрощенная версия кода Кендера, которая, кажется, выполняет ту же самую хитрость и не импортирует весь файл:

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line

Ну, так как вы используете Python, вы можете просто открыть файл и продолжить чтение строк из него.

f = open('file.log')

Если прочитанная строка не пуста, вы обрабатываете ее.

line = f.readline()
if line:
    // Do what you want with the line

Вы можете пропустить, что это нормально, чтобы продолжать звонить readline в EOF. В этом случае он просто будет возвращать пустую строку. И когда что-то добавляется к файлу журнала, чтение продолжится с того места, где оно остановилось, как вам нужно.

Если вы ищете решение, которое использует события или определенную библиотеку, укажите это в своем вопросе. В противном случае, я думаю, что это решение просто отлично.

Как вы можете видеть в статье Тима Голдена, на которую указывает Horst Gutmann, WIN32 относительно сложен и следит за каталогами, а не за одним файлом.

Я хотел бы предложить вам взглянуть на IronPython, который является реализацией Python .NET. С IronPython вы можете использовать все функциональные возможности .NET, включая

System.IO.FileSystemWatcher

Который обрабатывает отдельные файлы с простым интерфейсом событий.

Кажется, никто не опубликовал fswatch. Это кроссплатформенный наблюдатель файловой системы. Просто установите его, запустите и следуйте инструкциям.

Я использовал его с программами Python и Golang, и он просто работает.

Это пример проверки файла на наличие изменений. Тот, который не может быть лучшим способом сделать это, но это, безусловно, короткий путь.

Удобный инструмент для перезапуска приложения, когда были внесены изменения в исходный код. Я сделал это во время игры с pygame, чтобы видеть эффекты сразу после сохранения файла.

При использовании в pygame убедитесь, что материал в цикле while размещен в вашем игровом цикле, также как и в обновлении. В противном случае ваше приложение застрянет в бесконечном цикле, и вы не увидите обновления вашей игры.

file_size_stored = os.stat('neuron.py').st_size

  while True:
    try:
      file_size_current = os.stat('neuron.py').st_size
      if file_size_stored != file_size_current:
        restart_program()
    except: 
      pass

В случае, если вы хотели перезапустить код, который я нашел в Интернете. Вот. (Не относится к вопросу, хотя это может пригодиться)

def restart_program(): #restart application
    python = sys.executable
    os.execl(python, python, * sys.argv)

Получайте удовольствие, заставляя электроны делать то, что вы от них хотите.

Просто чтобы поместить это там, поскольку никто не упомянул об этом: в стандартной библиотеке есть модуль Python с именем filecmpкоторый имеет эту функцию, которая сравнивает два файла.

Просто убедитесь, что вы не делаете from filecmp import cmpчтобы не затмевать встроенную функцию в Python 2.x. Однако в Python 3.x это нормально, так как такого встроенного cmp()функционировать больше.

В любом случае, вот как выглядит его использование:

      import filecmp
filecmp.cmp(path_to_file_1, path_to_file_2, shallow=True)

Аргумент мелкой по умолчанию имеет значение True . Если значение аргумента равно True, то сравниваются только метаданные файлов; однако, если значение аргумента равно False, то сравнивается содержимое файлов.

Может эта информация будет кому-то полезна.

ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001

class myThread (threading.Thread):
    def __init__(self, threadID, fileName, directory, origin):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.fileName = fileName
        self.daemon = True
        self.dir = directory
        self.originalFile = origin
    def run(self):
        startMonitor(self.fileName, self.dir, self.originalFile)

def startMonitor(fileMonitoring,dirPath,originalFile):
    hDir = win32file.CreateFile (
        dirPath,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG_BACKUP_SEMANTICS,
        None
    )
    # Wait for new data and call ProcessNewData for each new chunk that's
    # written
    while 1:
        # Wait for a change to occur
        results = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            False,
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
            None,
            None
        )
        # For each change, check to see if it's updating the file we're
        # interested in
        for action, file_M in results:
            full_filename = os.path.join (dirPath, file_M)
            #print file, ACTIONS.get (action, "Unknown")
            if len(full_filename) == len(fileMonitoring) and action == 3:
                #copy to main file
                ...

Поскольку он установлен глобально, мой любимый подход - использовать nodemon. Если ваш исходный код находится в src, и ваша точка входа src/app.py, тогда это так просто:

nodemon -w 'src/**' -e py,html --exec python src/app.py

... где -e py,html позволяет вам контролировать, какие типы файлов отслеживать изменения.

watchfiles (https://github.com/samuelcolvin/watchfiles) — это Python API и CLI, использующие библиотеку Notify (https://github.com/notify-rs/notify), написанную на Rust.

Реализация ржавчины в настоящее время (2022-10-09) поддерживает:

  • Linux/Android: инотифай
  • macOS: FSEvents или kqueue, см. функции
  • Windows: ReadDirectoryChangesW
  • FreeBSD/NetBSD/OpenBSD/DragonflyBSD: kqueue
  • Все платформы: опрос

Бинарные файлы доступны на PyPI (https://pypi.org/project/watchfiles/) и conda-forge ( https://github.com/conda-forge/watchfiles-feedstock ).

Вот пример, предназначенный для просмотра входных файлов, которые записывают не более одной строки в секунду, но обычно намного меньше. Цель состоит в добавлении последней строки (самой последней записи) в указанный выходной файл. Я скопировал это из одного из моих проектов и просто удалил все ненужные строки. Вам нужно будет заполнить или изменить недостающие символы.

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        Ui_MainWindow.__init__(self)
        self._fileWatcher = QFileSystemWatcher()
        self._fileWatcher.fileChanged.connect(self.fileChanged)

    def fileChanged(self, filepath):
        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
        # ^^ About to test this, may need more sophisticated solution
        with open(filepath) as file:
            lastLine = list(file)[-1]
        destPath = self._filemap[filepath]['dest file']
        with open(destPath, 'a') as out_file:               # a= append
            out_file.writelines([lastLine])

Конечно, охватывающий класс QMainWindow не является строго обязательным, т.е. Вы можете использовать QFileSystemWatcher в одиночку.

Связанное решение @4Oh4 плавное изменение списка файлов для просмотра;

import os
import sys
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self._cached_stamp_files = {}
        self.filenames = watch_files
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        for file in self.filenames:
            stamp = os.stat(file).st_mtime
            if not file in self._cached_stamp_files:
                self._cached_stamp_files[file] = 0
            if stamp != self._cached_stamp_files[file]:
                self._cached_stamp_files[file] = stamp
                # File has changed, so do something...
                file_to_read = open(file, 'r')
                value = file_to_read.read()
                print("value from file", value)
                file_to_read.seek(0)
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop
    def watch(self):
        while self.running:
            try:
                # Look for changes
                time.sleep(self.refresh_delay_secs)
                self.look()
            except KeyboardInterrupt:
                print('\nDone')
                break
            except FileNotFoundError:
                # Action on file not found
                pass
            except Exception as e:
                print(e)
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)
    # pass

watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']

# watcher = Watcher(watch_file)  # simple



if __name__ == "__main__":
    watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going
      import inotify.adapters
from datetime import datetime


LOG_FILE='/var/log/mysql/server_audit.log'


def main():
    start_time = datetime.now()
    while True:
        i = inotify.adapters.Inotify()
        i.add_watch(LOG_FILE)
        for event in i.event_gen(yield_nones=False):
            break
        del i

        with open(LOG_FILE, 'r') as f:
            for line in f:
                entry = line.split(',')
                entry_time = datetime.strptime(entry[0],
                                               '%Y%m%d %H:%M:%S')
                if entry_time > start_time:
                    start_time = entry_time
                    print(entry)


if __name__ == '__main__':
    main()

У меня есть решение, нет необходимости в сторонней библиотеке!

Вот код:

      path_to_watch = "your/path"
print('Your folder path is"',path,'"')
before = dict ([(f, None) for f in os.listdir (path_to_watch)])
while 1:
       after = dict ([(f, None) for f in os.listdir (path_to_watch)])
       added = [f for f in after if not f in before]
       if added:
                print("Added: ", ", ".join (added))
                break
        else:
             before = after

Я отредактировал код, исходный код доступен по адресу http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html

Если вы используете код из приведенной выше ссылки, обратите внимание, что исходный код был создан на python 2x, поэтому вам нужно преобразовать его в python 3.

Вы можете использовать цикл For с диапазоном (anynumber), если хотите добавить определенное количество файлов, а затем разорвать цикл. Примечание:

Всякий раз, когда вы добавляете какой-либо файл в путь, он печатает текст и прерывает, и если файлы не добавлены, он продолжит работу.

Я понял, что вы хотите постоянно проверять, записаны ли какие-то данные в определенный файл, вот решение:

      import os
filename="myfile.txt" # Add full path if file is located in another directory
initial_filesize=os.path.getsize(filename) # Getting size of the file for comparition 
while 1:
    final_filesize=os.path.getsize(filename)
    if final_filsize<intial_filesize or final_filesize>initial_filesize:
        print("change in file")
        break
        # You execute your code here
    else:
        pass

Приведенный выше код работает следующим образом:

  1. Сначала мы воспользовались OS moduleдля взаимодействия с системой или ОС.
  2. Прежде чем войти в цикл, мы получили исходный размер файла и назвали его как initial_filesize.
  3. Теперь мы использовали While loop, в нем мы постоянно получаем размер файла файла, и если были внесены изменения, он попадает в if statement.

Итак, в основном, мы только что использовали размер файла, чтобы проверить, были ли внесены изменения в файл, довольно просто, и этот метод может работать практически для любого типа файла.

Лучшее и простое решение - использовать пигтейл: https://pypi.python.org/pypi/pygtail

from pygtail import Pygtail

while True:
    for line in Pygtail("some.log"):
        sys.stdout.write(line)

Вы также можете использовать простую библиотеку под названием repyt, вот пример:

repyt ./app.py

Если вы используете Windows, создайте этот файл POLL.CMD

@echo off
:top
xcopy /m /y %1 %2 | find /v "File(s) copied"
timeout /T 1 > nul
goto :top

затем вы можете ввести "poll dir1dir2", и он скопирует все файлы из dir1 в dir2 и будет проверять наличие обновлений один раз в секунду.

"Найти" не обязательно, чтобы консоль была менее шумной.

Это не рекурсивно. Возможно, вы могли бы сделать его рекурсивным, используя /e в xcopy.

Самым простым решением было бы получить два экземпляра одного и того же файла через интервал и сравнить их. Вы можете попробовать что-то вроде этого

          while True:
        # Capturing the two instances models.py after certain interval of time
        print("Looking for changes in " + app_name.capitalize() + " models.py\nPress 'CTRL + C' to stop the program")
        with open(app_name.capitalize() + '/filename', 'r+') as app_models_file:
            filename_content = app_models_file.read()
        time.sleep(5)
        with open(app_name.capitalize() + '/filename', 'r+') as app_models_file_1:
            filename_content_1 = app_models_file_1.read()
        # Comparing models.py after certain interval of time
        if filename_content == filename_content_1:
            pass
        else:
            print("You made a change in " + app_name.capitalize() + " filename.\n")
            cmd = str(input("Do something with the file?(y/n):"))
            if cmd == 'y':
                # Do Something
            elif cmd == 'n':
                # pass or do something
            else:
                print("Invalid input")

Я не знаю какой-либо конкретной функции Windows. Вы можете попытаться получить MD5-хэш файла каждую секунду / минуту / час (зависит от того, насколько быстро он вам нужен) и сравнить его с последним хеш-кодом. Когда он отличается, вы знаете, что файл был изменен, и вы читаете самые новые строки.

Другие вопросы по тегам