Получить последние n строк файла с Python, аналогичного tail
Я пишу средство просмотра файла журнала для веб-приложения, и для этого я хочу разбить на страницы строки файла журнала. Элементы в файле являются строками, основанными на самом новом элементе внизу.
Так что мне нужно tail()
метод, который может читать n
линии снизу и поддерживает смещение. То, что я придумал, выглядит так:
def tail(f, n, offset=0):
"""Reads a n lines from f with an offset of offset lines."""
avg_line_length = 74
to_read = n + offset
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None]
avg_line_length *= 1.3
Это разумный подход? Каков рекомендуемый способ привязки лог-файлов к смещению?
36 ответов
Код, который я в итоге использовал. Я думаю, что это пока лучшее
def tail(f, n, offset=None):
"""Reads a n lines from f with an offset of offset lines. The return
value is a tuple in the form ``(lines, has_more)`` where `has_more` is
an indicator that is `True` if there are more lines in the file.
"""
avg_line_length = 74
to_read = n + (offset or 0)
while 1:
try:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
f.seek(0)
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None], \
len(lines) > to_read or pos > 0
avg_line_length *= 1.3
Это может быть быстрее, чем у вас. Не делает никаких предположений о длине линии. Выполняет обратный просмотр файла по одному блоку за раз, пока не будет найдено правильное количество символов \ n.
def tail( f, lines=20 ):
total_lines_wanted = lines
BLOCK_SIZE = 1024
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
# from the end of the file
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
# read the last block we haven't yet read
f.seek(block_number*BLOCK_SIZE, 2)
blocks.append(f.read(BLOCK_SIZE))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
blocks.append(f.read(block_end_byte))
lines_found = blocks[-1].count('\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = ''.join(reversed(blocks))
return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
Мне не нравятся хитрые предположения о длине линии, когда - с практической точки зрения - вы никогда не узнаете такие вещи.
Как правило, это позволит найти последние 20 строк на первом или втором проходе цикла. Если ваша 74-символьная вещь на самом деле точна, вы делаете размер блока 2048, и вы почти сразу же получите 20 строк.
Кроме того, я не сжигаю много мозговых калорий, пытаясь выровнять соответствие с физическими блоками ОС. Используя эти высокоуровневые пакеты ввода / вывода, я сомневаюсь, что вы увидите какие-либо последствия для производительности при попытке выравнивания по границам блоков ОС. Если вы используете низкоуровневый ввод-вывод, вы можете увидеть ускорение.
Предполагается Unix-подобная система на Python 2, которую вы можете сделать:
import os
def tail(f, n, offset=0):
stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
stdin.close()
lines = stdout.readlines(); stdout.close()
return lines[:,-offset]
Для Python 3 вы можете сделать:
import subprocess
def tail(f, n, offset=0):
proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
lines = proc.stdout.readlines()
return lines[:, -offset]
Вот мой ответ. Чистый питон. Использование timeit кажется довольно быстрым. Хвост 100 строк файла журнала, который имеет 100000 строк:
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165
Вот код:
import os
def tail(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end"""
# place holder for the lines found
lines_found = []
# block counter will be multiplied by buffer
# to get the block size from the end
block_counter = -1
# loop until we find X lines
while len(lines_found) < lines:
try:
f.seek(block_counter * _buffer, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
f.seek(0)
lines_found = f.readlines()
break
lines_found = f.readlines()
# we found enough lines, get out
# Removed this line because it was redundant the while will catch
# it, I left it for history
# if len(lines_found) > lines:
# break
# decrement the block counter to get the
# next X bytes
block_counter -= 1
return lines_found[-lines:]
Если чтение всего файла приемлемо, тогда используйте deque.
from collections import deque
deque(f, maxlen=n)
До 2.6 у deques не было опции maxlen, но это достаточно легко реализовать.
import itertools
def maxque(items, size):
items = iter(items)
q = deque(itertools.islice(items, size))
for item in items:
del q[0]
q.append(item)
return q
Если требуется прочитать файл с конца, используйте галопный поиск (он же экспоненциальный).
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
while len(lines) <= n:
try:
f.seek(-pos, 2)
except IOError:
f.seek(0)
break
finally:
lines = list(f)
pos *= 2
return lines[-n:]
Ответ С.Лотта выше почти работает для меня, но в итоге дает мне частичные строки. Оказывается, это повреждает данные на границах блоков, потому что данные хранят прочитанные блоки в обратном порядке. Когда вызывается '.join (данные), блоки расположены в неправильном порядке. Это исправляет это.
def tail(f, window=20):
"""
Returns the last `window` lines of file `f` as a list.
f - a byte file-like object
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and bytes > 0:
if bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
data.insert(0, f.read(BUFSIZ))
else:
# file too small, start from begining
f.seek(0,0)
# only read what was not read
data.insert(0, f.read(bytes))
linesFound = data[0].count('\n')
size -= linesFound
bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
Простое и быстрое решение с помощью mmap:
import mmap
import os
def tail(filename, n):
"""Returns last n lines from the filename. No exception handling"""
size = os.path.getsize(filename)
with open(filename, "rb") as f:
# for Windows the mmap parameters are different
fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
try:
for i in xrange(size - 1, -1, -1):
if fm[i] == '\n':
n -= 1
if n == -1:
break
return fm[i + 1 if i else 0:].splitlines()
finally:
fm.close()
Самый простой способ - использовать deque
:
from collections import deque
def tail(filename, n=10):
with open(filename) as f:
return deque(f, n)
Обновите решение @papercrane до python3. Откройте файл с помощью open(filename, 'rb')
а также:
def tail(f, window=20):
"""Returns the last `window` lines of file `f` as a list.
"""
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
remaining_bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and remaining_bytes > 0:
if remaining_bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
bunch = f.read(BUFSIZ)
else:
# file too small, start from beginning
f.seek(0, 0)
# only read what was not read
bunch = f.read(remaining_bytes)
bunch = bunch.decode('utf-8')
data.insert(0, bunch)
size -= bunch.count('\n')
remaining_bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
Публикация ответа от моего комментария на аналогичный вопрос, где тот же метод использовался для изменения последней строки файла, а не просто для его получения.
Для файла значительного размера,mmap
это лучший способ сделать это. Для улучшения существующих mmap
Ответ: эта версия переносима между Windows и Linux и должна работать быстрее (хотя она не будет работать без каких-либо изменений на 32-битном Python с файлами в диапазоне ГБ, см. другой ответ для подсказок по обработке этого и для изменения к работа на Python 2).
import io # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap
def skip_back_lines(mm, numlines, startidx):
'''Factored out to simplify handling of n and offset'''
for _ in itertools.repeat(None, numlines):
startidx = mm.rfind(b'\n', 0, startidx)
if startidx < 0:
break
return startidx
def tail(f, n, offset=0):
# Reopen file in binary mode
with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# len(mm) - 1 handles files ending w/newline by getting the prior line
startofline = skip_back_lines(mm, offset, len(mm) - 1)
if startofline < 0:
return [] # Offset lines consumed whole file, nothing to return
# If using a generator function (yield-ing, see below),
# this should be a plain return, no empty list
endoflines = startofline + 1 # Slice end to omit offset lines
# Find start of lines to capture (add 1 to move from newline to beginning of following line)
startofline = skip_back_lines(mm, n, startofline) + 1
# Passing True to splitlines makes it return the list of lines without
# removing the trailing newline (if any), so list mimics f.readlines()
return mm[startofline:endoflines].splitlines(True)
# If Windows style \r\n newlines need to be normalized to \n, and input
# is ASCII compatible, can normalize newlines with:
# return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)
Предполагается, что число выровненных строк достаточно мало, и вы можете безопасно прочитать их все в память одновременно; Вы также можете сделать это функцией генератора и вручную прочитать строку за раз, заменив последнюю строку на:
mm.seek(startofline)
# Call mm.readline n times, or until EOF, whichever comes first
# Python 3.2 and earlier:
for line in itertools.islice(iter(mm.readline, b''), n):
yield line
# 3.3+:
yield from itertools.islice(iter(mm.readline, b''), n)
Наконец, это читается в двоичном режиме (необходимо использовать mmap
) так что дает str
линии (Py2) и bytes
линии (Py3); если ты хочешь unicode
(Py2) или str
(Py3), итеративный подход может быть настроен для декодирования для вас и / или исправления новых строк:
lines = itertools.islice(iter(mm.readline, b''), n)
if f.encoding: # Decode if the passed file was opened with a specific encoding
lines = (line.decode(f.encoding) for line in lines)
if 'b' not in f.mode: # Fix line breaks if passed file opened in text mode
lines = (line.replace(os.linesep, '\n') for line in lines)
# Python 3.2 and earlier:
for line in lines:
yield line
# 3.3+:
yield from lines
Примечание: я напечатал все это на машине, где у меня нет доступа к Python для тестирования. Пожалуйста, дайте мне знать, если я что-то опечатал; это было достаточно похоже на мой другой ответ, что я думаю, что это должно работать, но настройки (например, обработка offset
) может привести к тонким ошибкам. Пожалуйста, дайте мне знать в комментариях, если есть какие-либо ошибки.
Еще более чистая версия, совместимая с Python3, которая не вставляет, а добавляет и переворачивает:
def tail(f, window=1):
"""
Returns the last `window` lines of file `f` as a list of bytes.
"""
if window == 0:
return b''
BUFSIZE = 1024
f.seek(0, 2)
end = f.tell()
nlines = window + 1
data = []
while nlines > 0 and end > 0:
i = max(0, end - BUFSIZE)
nread = min(end, BUFSIZE)
f.seek(i)
chunk = f.read(nread)
data.append(chunk)
nlines -= chunk.count(b'\n')
end -= nread
return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])
используйте это так:
with open(path, 'rb') as f:
last_lines = tail(f, 3).decode('utf-8')
Просто:
with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
print(''.join(tail)
Я нашел Popen выше, чтобы быть лучшим решением. Это быстро и грязно, и это работает Для Python 2.6 на Unix-машине я использовал следующее
def GetLastNLines(self, n, fileName):
"""
Name: Get LastNLines
Description: Gets last n lines using Unix tail
Output: returns last n lines of a file
Keyword argument:
n -- number of last lines to return
filename -- Name of the file you need to tail into
"""
p=subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
soutput,sinput=p.communicate()
return soutput
soutput будет содержать последние n строк кода. Чтобы перебрать soutput построчно, выполните:
for line in GetLastNLines(50,'myfile.log').split('\n'):
print line
Существует несколько существующих реализаций tail на pypi, которые вы можете установить с помощью pip:
- mtFileUtil
- multitail
- log4tailer
- ...
В зависимости от вашей ситуации, могут быть преимущества использования одного из этих существующих инструментов.
Есть очень полезный модуль, который может сделать это:
from file_read_backwards import FileReadBackwards
with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:
# getting lines by lines starting from the last line up
for l in frb:
print(l)
Основано на топовом ответе С. Лотта (25 сентября 2008 г. в 21:43), но исправлено для небольших файлов.
def tail(the_file, lines_2find=20):
the_file.seek(0, 2) #go to end of file
bytes_in_file = the_file.tell()
lines_found, total_bytes_scanned = 0, 0
while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned:
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
the_file.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += the_file.read(1024).count('\n')
the_file.seek(-total_bytes_scanned, 2)
line_list = list(the_file.readlines())
return line_list[-lines_2find:]
#we read at least 21 line breaks from the bottom, block by block for speed
#21 to ensure we don't get a half line
Надеюсь, это полезно.
Вот довольно простая реализация:
with open('/etc/passwd', 'r') as f:
try:
f.seek(0,2)
s = ''
while s.count('\n') < 11:
cur = f.tell()
f.seek((cur - 10))
s = f.read(10) + s
f.seek((cur - 10))
print s
except Exception as e:
f.readlines()
Вы можете перейти к концу вашего файла с помощью f.seek(0, 2), а затем прочесть строки одну за другой со следующей заменой readline():
def readline_backwards(self, f):
backline = ''
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
backline = last
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
f.seek(1, 1)
return backline
У некоторых из этих решений есть проблемы, если файл не заканчивается на \n или обеспечивает чтение всей первой строки.
def tail(file, n=1, bs=1024):
f = open(file)
f.seek(-1,2)
l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
B = f.tell()
while n >= l and B > 0:
block = min(bs, B)
B -= block
f.seek(B, 0)
l += f.read(block).count('\n')
f.seek(B, 0)
l = min(l,n) # discard first (incomplete) line if l > n
lines = f.readlines()[-l:]
f.close()
return lines
Деки (2.6+)
Если вы знаете, что файл будет небольшим, вполне подойдет простая двухсторонняя очередь.
from collections import deque
def tail(f, n):
return deque(f, n)
Цитата с docs.python.org:
Если maxlen не указан или имеет значение None, деки могут вырасти до произвольной длины. В противном случае дек ограничен указанной максимальной длиной. После заполнения очереди ограниченной длины при добавлении новых элементов соответствующее количество элементов отбрасывается с противоположного конца. Деки ограниченной длины предоставляют функциональность, аналогичную хвостовому фильтру в Unix. Они также полезны для отслеживания транзакций и других пулов данных, где интерес представляют только самые последние действия.
Галопирующий поиск (2.7+)
Если размер файла не указан, рассмотрите возможность поиска файла с конца.
Галопирующий или экспоненциальный поиск минимизирует количество вызовов чтения, умножая количество байтов для поиска на два на каждой итерации.
Этот фрагмент хорошо обрабатывает крайние случаи, за исключением многобайтовых разделителей и файлов, открытых в текстовом режиме (пример, который может их обрабатывать, см. в разделе «Пограничные случаи») и сохраняет сегменты в эффективном по памяти пространстве.deque
пока не присоединитесь к ним непосредственно перед тем, как вернуть их как одиночныхbytes
, стараясь читать данные только один раз.
from collections import deque
from os import SEEK_CUR, SEEK_END
def tail(f, n, d = b'\n'):
u"Read `n` segments (lines) from the end of file `f`, separated by `d`."
a = deque()
o = 1
try:
# Seek to end of file, exclude first byte from check for newline.
f.seek(-1, SEEK_END)
s = f.read(1)
c = 0
# Read more segments until enough newline characters has been read.
while c < n:
n -= c # Subtract newline count from remaining.
a.appendleft(s) # Insert segment at the beginning.
f.seek(-o * 3, SEEK_CUR) # Seek past the read bytes, plus 2x that.
o *= 2 # Multiply step- and readsize by two.
s = f.read(o) # Read new segment from file.
c = s.count(d) # Count the number of newline characters.
except OSError:
# Reached beginning of file, read start of file > start of last segment.
p = max(0, f.tell() - o)
f.seek(0)
s = f.read(p)
c = s.count(d)
if c >= n:
# Strip data, up to the start of the first line, from the last segment.
i = s.rfind(d)
while i != -1 and n > 1:
i = s.rfind(d, None, i)
n -= 1
s = s[i+1:]
a.appendleft(s)
return b"".join(a)
Использование:
f.write(b'Third\nSecond\nLast'); f.seek(0)
assert readlast(f, 2, b'\n') == b"Second\nLast\n"
f.write(b'\n\n'); f.seek(0)
assert readlast(f, 1, b'\n') == b"\n"
f.write(b'X\n'); f.seek(0)
assert readlast(f, 1, b'\n') == b"X\n"
f.write(b''); f.seek(0)
assert readlast(f, 1, b'\n') == b""
Краевые случаи (2.7+)
Самый простой подход, часть чтения всего файла, состоит в том, чтобы пройти через данные с конца файла и проверить каждый прочитанный байт или блок байтов по значению/символу разделителя.
Это не так быстро, как функция галопирующего поиска, описанная выше, но ее гораздо проще написать, чем обрабатывать крайние случаи, такие как файлы в кодировке UTF-16/32 и файлы, в которых используются другие многобайтовые разделители строк.
Помимо этого, этот пример также может обрабатывать файлы, открытые в текстовом режиме (но вам все равно следует рассмотреть возможность повторного открытия их в байтовом режиме, поскольку его вызовы относительного поиска более эффективны).
def _tail__bytes(f, n, sep, size, step):
# Point cursor to the end of the file.
f.seek(0, SEEK_END)
# Halt when 'sep' occurs enough times.
while n > 0:
# Seek past the byte just read, or last byte if none has been read.
f.seek(-size-step, SEEK_CUR)
# Read one byte/char/block, then step again, until 'sep' occurs.
while f.read(size) != sep:
f.seek(-size-step, SEEK_CUR)
n -= 1
def _tail__text(f, n, sep, size, step):
# Text mode, same principle but without the use of relative offsets.
o = f.seek(0, SEEK_END)
o = f.seek(o-size-step)
while n > 0:
o = f.seek(o-step)
while f.read(step) != sep:
o = f.seek(o-step)
n -= 1
def tail(f, n, sep, fixed = False):
"""tail(f: io.BaseIO, n: int, sep: bytes, fixed: bool = False) -> bytes|str
Return the last `n` segments of file `f`, separated by `sep`.
Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
to pass the correct delimiter) in files opened in byte mode.
"""
size = len(sep)
step = len(sep) if (fixed is True) else (fixed or 1)
if not size:
raise ValueError("Zero-length separator.")
try:
if 'b' in f.mode:
# Process file opened in byte mode.
_tail__bytes(f, n, sep, size, step)
else:
# Process file opened in text mode.
_tail__text(f, n, sep, size, step)
except (OSError, ValueError):
# Beginning of file reached.
f.seek(0, SEEK_SET)
return f.read()
Использование:
f.write("X\nY\nZ\n").encode('utf32'); f.seek(0)
assert tail(f, 1, "\n".encode('utf32')[4:], fixed = True) == b"Z\n"
f.write("X\nY\nZ\n").encode('utf16'); f.seek(0)
assert tail(f, 1, "\n".encode('utf16')[2:], fixed = True) == b"Z\n"
f.write(b'X<br>Y</br>'); f.seek(0)
assert readlast(f, 1, b'<br>') == b"Y</br>"
f.write("X\nY\n"); f.seek(0)
assert readlast(f, 1, "\n") == "Y\n"
Перед публикацией примеры были протестированы на файлах разной длины, пустых файлах, файлах разных размеров, состоящих только из символов новой строки, и т. д. Игнорирует конечный символ новой строки.
Другое решение
если ваш текстовый файл выглядит так: мышь змея кошка ящерица волк собака
вы можете отменить этот файл, просто используя индексирование массива в python '''
contents=[]
def tail(contents,n):
with open('file.txt') as file:
for i in file.readlines():
contents.append(i)
for i in contents[:n:-1]:
print(i)
tail(contents,-5)
результат: собака волк ящерица кошка
Основано на ответе Eyecue (10 июня 2010 в 21:28): этот класс добавляет методы head() и tail() к объекту файла.
class File(file):
def head(self, lines_2find=1):
self.seek(0) #Rewind file
return [self.next() for x in xrange(lines_2find)]
def tail(self, lines_2find=1):
self.seek(0, 2) #go to end of file
bytes_in_file = self.tell()
lines_found, total_bytes_scanned = 0, 0
while (lines_2find+1 > lines_found and
bytes_in_file > total_bytes_scanned):
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
self.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += self.read(1024).count('\n')
self.seek(-total_bytes_scanned, 2)
line_list = list(self.readlines())
return line_list[-lines_2find:]
Использование:
f = File('path/to/file', 'r')
f.head(3)
f.tail(3)
Для повышения эффективности работы с очень большими файлами (что часто встречается в файлах журналов, где вы можете использовать tail), вы, как правило, хотите избегать чтения всего файла (даже если вы делаете это, не считывая весь файл сразу в память). Однако вы делаете нужно как-то отработать смещение в строках, а не в символах. Одной из возможностей является чтение в обратном порядке с помощью функции seek() char за char, но это очень медленно. Вместо этого лучше обрабатывать большими блоками.
У меня есть служебная функция, которую я написал некоторое время назад для чтения файлов назад, которую можно использовать здесь.
import os, itertools
def rblocks(f, blocksize=4096):
"""Read file as series of blocks from end of file to start.
The data itself is in normal order, only the order of the blocks is reversed.
ie. "hello world" -> ["ld","wor", "lo ", "hel"]
Note that the file must be opened in binary mode.
"""
if 'b' not in f.mode.lower():
raise Exception("File must be opened using binary mode.")
size = os.stat(f.name).st_size
fullblocks, lastblock = divmod(size, blocksize)
# The first(end of file) block will be short, since this leaves
# the rest aligned on a blocksize boundary. This may be more
# efficient than having the last (first in file) block be short
f.seek(-lastblock,2)
yield f.read(lastblock)
for i in range(fullblocks-1,-1, -1):
f.seek(i * blocksize)
yield f.read(blocksize)
def tail(f, nlines):
buf = ''
result = []
for block in rblocks(f):
buf = block + buf
lines = buf.splitlines()
# Return all lines except the first (since may be partial)
if lines:
result.extend(lines[1:]) # First line may not be complete
if(len(result) >= nlines):
return result[-nlines:]
buf = lines[0]
return ([buf]+result)[-nlines:]
f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
print line
[Редактировать] Добавлена более конкретная версия (избегать необходимости реверса дважды)
Обновление для ответа A.Coady
Работает с python 3.
Это использует экспоненциальный поиск и буферизует толькоN
линии сзади и очень эффективен.
import time
import os
import sys
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
# set file pointer to end
f.seek(0, os.SEEK_END)
isFileSmall = False
while len(lines) <= n:
try:
f.seek(f.tell() - pos, os.SEEK_SET)
except ValueError as e:
# lines greater than file seeking size
# seek to start
f.seek(0,os.SEEK_SET)
isFileSmall = True
except IOError:
print("Some problem reading/seeking the file")
sys.exit(-1)
finally:
lines = f.readlines()
if isFileSmall:
break
pos *= 2
print(lines)
return lines[-n:]
with open("stream_logs.txt") as f:
while(True):
time.sleep(0.5)
print(tail(f,2))
import time
attemps = 600
wait_sec = 5
fname = "YOUR_PATH"
with open(fname, "r") as f:
where = f.tell()
for i in range(attemps):
line = f.readline()
if not line:
time.sleep(wait_sec)
f.seek(where)
else:
print line, # already has newline
Мне пришлось прочитать определенное значение из последней строки файла, и наткнулся на этот поток. Вместо того, чтобы заново изобретать колесо в Python, я закончил крошечным сценарием оболочки, сохраненным как /usr/local/bin/get_last_netp:
#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'}
И в программе Python:
from subprocess import check_output
last_netp = int(check_output("/usr/local/bin/get_last_netp"))
import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))
Хорошо! У меня была аналогичная проблема, хотя мне требовалась ТОЛЬКО ПОСЛЕДНЯЯ СТРОКА, поэтому я придумал собственное решение.
def get_last_line(filepath):
try:
with open(filepath,'rb') as f:
f.seek(-1,os.SEEK_END)
text = [f.read(1)]
while text[-1] != '\n'.encode('utf-8') or len(text)==1:
f.seek(-2, os.SEEK_CUR)
text.append(f.read(1))
except Exception as e:
pass
return ''.join([t.decode('utf-8') for t in text[::-1]]).strip()
Эта функция возвращает последнюю строку в файле.
У меня есть файл журнала размером 1,27 ГБ, и на поиск последней строки потребовалось очень меньше времени (даже не полсекунды).
abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
for num, line in enumerate(myFile, 1):
if abc in line:
lastline = num
print "last occurance of work at file is in "+str(lastline)
This is my version of tailf
import sys, time, os
filename = 'path to file'
try:
with open(filename) as f:
size = os.path.getsize(filename)
if size < 1024:
s = size
else:
s = 999
f.seek(-s, 2)
l = f.read()
print l
while True:
line = f.readline()
if not line:
time.sleep(1)
continue
print line
except IOError:
pass