Make sure only a single instance of a program is running
Есть ли у Pythonic способ запустить только один экземпляр программы?
Единственное разумное решение, которое я придумала, - это попытаться запустить его как сервер на каком-либо порту, а затем вторая программа, пытающаяся привязаться к тому же порту, не сможет. Но это не очень хорошая идея, может быть, есть что-то более легкое, чем это?
(Примите во внимание, что иногда ожидается сбой программы, т.е. segfault - поэтому такие вещи, как "файл блокировки" не будут работать)
Обновление: предлагаемые решения намного сложнее и менее зависимы, чем просто порт, занятый несуществующим сервером, поэтому мне придется пойти с этим.
25 ответов
Следующий код должен выполнять эту работу, он кроссплатформенный и работает на Python 2.4-3.2. Я проверил это на Windows, OS X и Linux.
from tendo import singleton
me = singleton.SingleInstance() # will sys.exit(-1) if other instance is running
Последняя версия кода доступна singleton.py. Пожалуйста, регистрируйте ошибки здесь.
Вы можете установить тендер одним из следующих способов:
easy_install tendo
pip install tendo
- вручную, получив его с http://pypi.python.org/pypi/tendo
Простое кроссплатформенное решение, найденное в другом вопросе от zgoda:
import fcntl, sys
pid_file = 'program.pid'
fp = open(pid_file, 'w')
try:
fcntl.lockf(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
# another instance is running
sys.exit(0)
Очень похоже на предложение С. Лотта, но с кодом.
Этот код специфичен для Linux. Он использует "абстрактные" доменные сокеты UNIX, но он прост и не оставляет устаревших файлов блокировки. Я предпочитаю это решению выше, потому что это не требует специально зарезервированного порта TCP.
try:
import socket
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
## Create an abstract socket, by prefixing it with null.
s.bind( '\0postconnect_gateway_notify_lock')
except socket.error as e:
error_code = e.args[0]
error_string = e.args[1]
print "Process already running (%d:%s ). Exiting" % ( error_code, error_string)
sys.exit (0)
Уникальная строка postconnect_gateway_notify_lock
может быть изменен, чтобы разрешить нескольким программам, которые требуют одного экземпляра.
Я не знаю, достаточно ли он питоничен, но в мире Java прослушивание определенного порта является довольно широко используемым решением, поскольку оно работает на всех основных платформах и не имеет проблем с аварийными программами.
Другое преимущество прослушивания порта заключается в том, что вы можете отправить команду работающему экземпляру. Например, когда пользователи запускают программу во второй раз, вы можете отправить запущенному экземпляру команду, чтобы сказать ему, чтобы открыть другое окно (например, именно это делает Firefox. Я не знаю, используют ли они порты TCP или именованные каналы или что-то вроде этого, хотя).
Никогда прежде не писал python, но это то, что я только что реализовал в mycheckpoint, чтобы предотвратить его запуск дважды или более crond:
import os
import sys
import fcntl
fh=0
def run_once():
global fh
fh=open(os.path.realpath(__file__),'r')
try:
fcntl.flock(fh,fcntl.LOCK_EX|fcntl.LOCK_NB)
except:
os._exit(0)
run_once()
Обнаружил предложение Славы-N после публикации в другом выпуске (http://stackru.com/questions/2959474). Эта функция вызывается как функция, блокирует файл исполняемых скриптов (не файл pid) и поддерживает блокировку до тех пор, пока скрипт не завершится (обычный или ошибка).
Используйте файл pid. У вас есть какое-то известное местоположение, "/path/to/pidfile", и при запуске вы делаете что-то вроде этого (частично псевдокод, потому что я готовлю кофе и не хочу много работать):
import os, os.path
pidfilePath = """/path/to/pidfile"""
if os.path.exists(pidfilePath):
pidfile = open(pidfilePath,"r")
pidString = pidfile.read()
if <pidString is equal to os.getpid()>:
# something is real weird
Sys.exit(BADCODE)
else:
<use ps or pidof to see if the process with pid pidString is still running>
if <process with pid == 'pidString' is still running>:
Sys.exit(ALREADAYRUNNING)
else:
# the previous server must have crashed
<log server had crashed>
<reopen pidfilePath for writing>
pidfile.write(os.getpid())
else:
<open pidfilePath for writing>
pidfile.write(os.getpid())
Другими словами, вы проверяете, существует ли pid-файл; если нет, напишите свой pid в этот файл. Если pid-файл существует, проверьте, является ли pid pid запущенного процесса; если так, то у вас запущен еще один живой процесс, так что просто выключите его. Если нет, то предыдущий процесс потерпел крах, поэтому зарегистрируйте его, а затем запишите свой собственный pid в файл вместо старого. Тогда продолжай.
Лучшее решение для этого на окнах - это использовать мьютексы, как предложено @zgoda.
import win32event
import win32api
from winerror import ERROR_ALREADY_EXISTS
mutex = win32event.CreateMutex(None, False, 'name')
last_error = win32api.GetLastError()
if last_error == ERROR_ALREADY_EXISTS:
print("App instance already running")
Некоторые ответы используют fctnl
(также входит в пакет тендера @sorin), который недоступен в Windows, и если вы попытаетесь заморозить свое приложение на python, используя такой пакет, как pyinstaller
который делает статический импорт, он выдает ошибку.
Кроме того, используя метод блокировки файла, создает read-only
проблема с файлами базы данных (испытал это с sqlite3
).
Вот мое возможное решение только для Windows. Поместите следующее в модуль, возможно, под названием "onlyone.py", или как угодно. Включите этот модуль непосредственно в ваш файл сценария __ main __ python.
import win32event, win32api, winerror, time, sys, os
main_path = os.path.abspath(sys.modules['__main__'].__file__).replace("\\", "/")
first = True
while True:
mutex = win32event.CreateMutex(None, False, main_path + "_{<paste YOUR GUID HERE>}")
if win32api.GetLastError() == 0:
break
win32api.CloseHandle(mutex)
if first:
print "Another instance of %s running, please wait for completion" % main_path
first = False
time.sleep(1)
объяснение
Код пытается создать мьютекс с именем, полученным из полного пути к сценарию. Мы используем косую черту, чтобы избежать путаницы с реальной файловой системой.
преимущества
- Никаких настроек или "магических" идентификаторов не требуется, используйте их в любом количестве различных сценариев.
- Вокруг не осталось устаревших файлов, мьютекс умирает вместе с вами.
- Печатает полезное сообщение при ожидании
Вы уже нашли ответ на аналогичный вопрос в другой ветке, поэтому для полноты изложения посмотрите, как добиться того же в Windows без имени mutex.
Для тех, кто использует wxPython для своих приложений, вы можете использовать функцию wx.SingleInstanceChecker
задокументировано здесь.
Я лично использую подкласс wx.App
который использует wx.SingleInstanceChecker
и возвращается False
от OnInit()
если существует уже существующий экземпляр приложения, выполняющийся следующим образом:
import wx
class SingleApp(wx.App):
"""
class that extends wx.App and only permits a single running instance.
"""
def OnInit(self):
"""
wx.App init function that returns False if the app is already running.
"""
self.name = "SingleApp-%s".format(wx.GetUserId())
self.instance = wx.SingleInstanceChecker(self.name)
if self.instance.IsAnotherRunning():
wx.MessageBox(
"An instance of the application is already running",
"Error",
wx.OK | wx.ICON_WARNING
)
return False
return True
Это простая замена для wx.App
это запрещает множественные случаи. Чтобы использовать это просто заменить wx.App
с SingleApp
в вашем коде так:
app = SingleApp(redirect=False)
frame = wx.Frame(None, wx.ID_ANY, "Hello World")
frame.Show(True)
app.MainLoop()
Это может сработать.
Попытайтесь создать файл PID в известном месте. Если у вас не получится, у кого-то заблокирован файл, все готово.
Когда вы закончите нормально, закройте и удалите файл PID, чтобы кто-то другой мог перезаписать его.
Вы можете обернуть вашу программу в сценарий оболочки, который удаляет файл PID, даже если ваша программа падает.
Вы также можете использовать PID-файл, чтобы убить программу, если она зависнет.
Поздний ответ, но для окон вы можете использовать:
from win32event import CreateMutex
from win32api import CloseHandle, GetLastError
from winerror import ERROR_ALREADY_EXISTS
import sys
class singleinstance:
""" Limits application to single instance """
def __init__(self):
self.mutexname = "testmutex_{D0E858DF-985E-4907-B7FB-8D732C3FC3B9}"
self.mutex = CreateMutex(None, False, self.mutexname)
self.lasterror = GetLastError()
def alreadyrunning(self):
return (self.lasterror == ERROR_ALREADY_EXISTS)
def __del__(self):
if self.mutex:
CloseHandle(self.mutex)
Применение
# do this at beginnig of your application
myapp = singleinstance()
# check is another instance of same program running
if myapp.alreadyrunning():
print ("Another instance of this program is already running")
sys.exit(1)
Основываясь на ответе Роберто Росарио, я придумал следующую функцию:
SOCKET = None
def run_single_instance(uniq_name):
try:
import socket
global SOCKET
SOCKET = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
## Create an abstract socket, by prefixing it with null.
# this relies on a feature only in linux, when current process quits, the
# socket will be deleted.
SOCKET.bind('\0' + uniq_name)
return True
except socket.error as e:
return False
Нам нужно определить глобальный SOCKET
vaiable, так как он будет собираться только после завершения всего процесса. Если мы объявим в функции локальную переменную, она выйдет из области видимости после выхода из функции, таким образом сокет будет удален.
Вся заслуга принадлежит Роберто Росарио, поскольку я только уточняю и уточняю его кодекс. И этот код будет работать только в Linux, как поясняет следующий цитируемый текст из https://troydhanson.github.io/network/Unix_domain_sockets.html:
В Linux есть особенность: если путь к сокету домена UNIX начинается с нулевого байта \0, его имя не отображается в файловой системе. Таким образом, он не будет конфликтовать с другими именами в файловой системе. Кроме того, когда сервер закрывает свой прослушивающий сокет домена UNIX в абстрактном пространстве имен, его файл удаляется; с обычными сокетами домена UNIX файл сохраняется после его закрытия сервером.
Вот пример кроссплатформенности, который я тестировал на Windows Server 2016 и Ubuntu 20.04 с использованием Python 3.7.9:
import os
class SingleInstanceChecker:
def __init__(self, id):
if isWin():
ensure_win32api()
self.mutexname = id
self.lock = win32event.CreateMutex(None, False, self.mutexname)
self.running = (win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS)
else:
ensure_fcntl()
self.lock = open(f"/tmp/isnstance_{id}.lock", 'wb')
try:
fcntl.lockf(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.running = False
except IOError:
self.running = True
def already_running(self):
return self.running
def __del__(self):
if self.lock:
try:
if isWin():
win32api.CloseHandle(self.lock)
else:
os.close(self.lock)
except Exception as ex:
pass
# ---------------------------------------
# Utility Functions
# Dynamically load win32api on demand
# Install with: pip install pywin32
win32api=winerror=win32event=None
def ensure_win32api():
global win32api,winerror,win32event
if win32api is None:
import win32api
import winerror
import win32event
# Dynamically load fcntl on demand
# Install with: pip install fcntl
fcntl=None
def ensure_fcntl():
global fcntl
if fcntl is None:
import fcntl
def isWin():
return (os.name == 'nt')
# ---------------------------------------
Вот он в употреблении:
import time, sys
def main(argv):
_timeout = 10
print("main() called. sleeping for %s seconds" % _timeout)
time.sleep(_timeout)
print("DONE")
if __name__ == '__main__':
SCR_NAME = "my_script"
sic = SingleInstanceChecker(SCR_NAME)
if sic.already_running():
print("An instance of {} is already running.".format(SCR_NAME))
sys.exit(1)
else:
main(sys.argv[1:])
Использование файла блокировки является довольно распространенным подходом в Unix. В случае сбоя необходимо выполнить очистку вручную. Вы можете сохранить PID в файле и при запуске проверить, есть ли процесс с этим PID, переопределив файл блокировки, если нет. (Тем не менее, вам также нужна блокировка вокруг файла read-file-check-pid-rewrite-file). В пакете os вы найдете то, что вам нужно для получения и проверки pid. Обычный способ проверить, существует ли процесс с данным pid, - отправить ему нефатальный сигнал.
Другие альтернативы могут быть объединены с семафорами стаи или посикс.
Открытие сетевого сокета, как предложил saua, вероятно, будет самым простым и переносимым.
Я использую single_process
на моем gentoo;
pip install single_process
пример:
from single_process import single_process
@single_process
def main():
print 1
if __name__ == "__main__":
main()
Я публикую это как ответ, потому что я новый пользователь, и переполнение стека не позволит мне голосовать.
Решение Сорина Сбарнеа работает для меня под OS X, Linux и Windows, и я благодарен за это.
Тем не менее, tempfile.gettempdir() ведет себя одинаково в OS X и Windows, а другой - в некоторых / многие / все (?) * Никсах (игнорируя тот факт, что OS X также Unix!). Разница важна для этого кода.
OS X и Windows имеют пользовательские временные каталоги, поэтому созданный одним пользователем временный файл не виден другому пользователю. В отличие от этого, во многих версиях *nix (я тестировал Ubuntu 9, RHEL 5, OpenSolaris 2008 и FreeBSD 8) временный каталог /tmp для всех пользователей.
Это означает, что когда файл блокировки создается на многопользовательском компьютере, он создается в /tmp, и только пользователь, который создает файл блокировки в первый раз, сможет запустить приложение.
Возможным решением является встраивание текущего имени пользователя в имя файла блокировки.
Стоит отметить, что решение OP по захвату порта также будет плохо работать на многопользовательской машине.
Я столкнулся с этой проблемой на прошлой неделе, и хотя я нашел несколько хороших решений, я решил сделать очень простой и чистый пакет Python и загрузил его в PyPI. Он отличается от тендо тем, что может блокировать любое имя строкового ресурса. Хотя вы, конечно, можете заблокировать __file__
добиться того же эффекта.
Установить с помощью: pip install quicklock
Используя это чрезвычайно просто:
[nate@Nates-MacBook-Pro-3 ~/live] python
Python 2.7.6 (default, Sep 9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from quicklock import singleton
>>> # Let's create a lock so that only one instance of a script will run
...
>>> singleton('hello world')
>>>
>>> # Let's try to do that again, this should fail
...
>>> singleton('hello world')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/nate/live/gallery/env/lib/python2.7/site-packages/quicklock/quicklock.py", line 47, in singleton
raise RuntimeError('Resource <{}> is currently locked by <Process {}: "{}">'.format(resource, other_process.pid, other_process.name()))
RuntimeError: Resource <hello world> is currently locked by <Process 24801: "python">
>>>
>>> # But if we quit this process, we release the lock automatically
...
>>> ^D
[nate@Nates-MacBook-Pro-3 ~/live] python
Python 2.7.6 (default, Sep 9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from quicklock import singleton
>>> singleton('hello world')
>>>
>>> # No exception was thrown, we own 'hello world'!
Посмотрите: https://pypi.python.org/pypi/quicklock
Пример Linux
Этот метод основан на создании временного файла, автоматически удаляемого после закрытия приложения. при запуске программы мы проверяем наличие файла; если файл существует (есть ожидающее выполнение), программа закрывается; в противном случае он создает файл и продолжает выполнение программы.
from tempfile import *
import time
import os
import sys
f = NamedTemporaryFile( prefix='lock01_', delete=True) if not [f for f in os.listdir('/tmp') if f.find('lock01_')!=-1] else sys.exit()
YOUR CODE COMES HERE
Я продолжаю подозревать, что должно быть хорошее решение POSIXy, использующее группы процессов, без необходимости обращаться к файловой системе, но я не могу его точно зафиксировать. Что-то вроде:
При запуске ваш процесс отправляет команду kill -0 всем процессам в определенной группе. Если такие процессы существуют, они завершаются. Затем он присоединяется к группе. Никакие другие процессы не используют эту группу.
Тем не менее, это имеет условие гонки - все процессы могут делать это в одно и то же время, и все они в конечном итоге присоединяются к группе и работают одновременно. К тому времени, когда вы добавили какой-нибудь мьютекс, чтобы сделать его водонепроницаемым, вам больше не нужны группы процессов.
Это может быть приемлемо, если ваш процесс запускается только cron, раз в минуту или каждый час, но я немного нервничаю, что произойдет сбой именно в тот день, когда вы этого не хотите.
Я думаю, что это не очень хорошее решение в конце концов, разве кто-то может улучшить его?
В системе Linux можно также спроситьpgrep -a
для количества экземпляров сценарий находится в списке процессов (опция -a показывает полную строку командной строки). Например
import os
import sys
import subprocess
procOut = subprocess.check_output( "/bin/pgrep -u $UID -a python", shell=True,
executable="/bin/bash", universal_newlines=True)
if procOut.count( os.path.basename(__file__)) > 1 :
sys.exit( ("found another instance of >{}<, quitting."
).format( os.path.basename(__file__)))
Удалить -u $UID
если ограничение должно распространяться на всех пользователей. Отказ от ответственности: а) предполагается, что (базовое) имя сценария является уникальным, б) могут быть условия гонки.
Вот хороший пример для django с contextmanager и memcached:https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html
Может использоваться для защиты одновременной работы на разных хостах. Может использоваться для управления несколькими задачами. Также можно изменить для простых скриптов Python.
Моя модификация приведенного выше кода находится здесь:
import time
from contextlib import contextmanager
from django.core.cache import cache
@contextmanager
def memcache_lock(lock_key, lock_value, lock_expire):
timeout_at = time.monotonic() + lock_expire - 3
# cache.add fails if the key already exists
status = cache.add(lock_key, lock_value, lock_expire)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_key)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
def main():
lock_name, lock_value = "lock_1", "locked"
with memcache_lock(lock_name, lock_value, LOCK_EXPIRE) as acquired:
if acquired:
# single instance code here:
pass
if __name__ == "__main__":
main()
Вот кросс-платформенная реализация, создающая временный файл блокировки с помощью диспетчера контекста.
Может использоваться для управления несколькими задачами.
import os
from contextlib import contextmanager
from time import sleep
class ExceptionTaskInProgress(Exception):
pass
# Context manager for suppressing exceptions
class SuppressException:
def __init__(self):
pass
def __enter__(self):
return self
def __exit__(self, *exc):
return True
# Context manager for task
class TaskSingleInstance:
def __init__(self, task_name, lock_path):
self.task_name = task_name
self.lock_path = lock_path
self.lock_filename = os.path.join(self.lock_path, self.task_name + ".lock")
if os.path.exists(self.lock_filename):
raise ExceptionTaskInProgress("Resource already in use")
def __enter__(self):
self.fl = open(self.lock_filename, "w")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.fl.close()
os.unlink(self.lock_filename)
# Here the task is silently interrupted
# if it is already running on another instance.
def main1():
task_name = "task1"
tmp_filename_path = "."
with SuppressException():
with TaskSingleInstance(task_name, tmp_filename_path):
print("The task `{}` has started.".format(task_name))
# The single task instance code is here.
sleep(5)
print("The task `{}` has completed.".format(task_name))
# Here the task is interrupted with a message
# if it is already running in another instance.
def main2():
task_name = "task1"
tmp_filename_path = "."
try:
with TaskSingleInstance(task_name, tmp_filename_path):
print("The task `{}` has started.".format(task_name))
# The single task instance code is here.
sleep(5)
print("Task `{}` completed.".format(task_name))
except ExceptionTaskInProgress as ex:
print("The task `{}` is already running.".format(task_name))
if __name__ == "__main__":
main1()
main2()
Создайте файл с именем lockexclusive.py.
import os
import sys
import atexit
import hashlib
@atexit.register # clean up at exit
def cleanup():
try:
if config.lock_file:
config.lock_file.close()
if config.fname:
os.remove(config.fname)
except Exception:
pass
config = sys.modules[__name__] # this allows us to share variables with the main script
config.file = None
config.fname = None
config.lock_file = None
config.maxinstances = 1
def configure_lock(
maxinstances: int = 1,
message: str | None = None,
file: str | None = None,
) -> None:
"""
Configures a lock file for a given file path and maximum number of instances.
Args:
maxinstances (int, optional): The maximum number of instances allowed to access the file. Defaults to 1.
message (str, optional): The message to print if the maximum number of instances is reached. Defaults to None.
file (str, optional): The file path to configure the lock file for. Defaults to None.
Returns:
None
Raises:
None
"""
if not file: # if not file is passed, we get the calling filename from the frame
f = sys._getframe(1)
dct = f.f_globals
file = dct.get("__file__", "")
config.file = os.path.normpath(file)
config.maxinstances = int(maxinstances)
for inst in range(config.maxinstances):
try:
hash = hashlib.sha256((config.file + f"{inst}").encode("utf-8", "ignore")) # unique name to make sure other that it doesn't interfere with other py files using this function
config.fname = hash.digest().hex() + ".locfi"
tmpf = os.path.join(os.environ.get("TMP"), config.fname)
if os.path.exists(tmpf):
os.remove(tmpf)
config.lock_file = os.open(tmpf, os.O_CREAT | os.O_EXCL)
break
except Exception as fe:
if inst + 1 == config.maxinstances:
if message:
print(message)
try:
sys.exit(1)
finally:
os._exit(1) # just to make sure :)
else:
continue
Импортируйте его в свой скрипт:
import sys
from time import sleep
from lockexclusive import configure_lock
# it can be used like this:
# configure_lock(maxinstances=1, message="More than one instance running",file=sys.argv[0])
# or without the file argument:
configure_lock(maxinstances=1, message="More than one instance running")
sleep(100)
import sys,os
# start program
try: # (1)
os.unlink('lock') # (2)
fd=os.open("lock", os.O_CREAT|os.O_EXCL) # (3)
except:
try: fd=os.open("lock", os.O_CREAT|os.O_EXCL) # (4)
except:
print "Another Program running !.." # (5)
sys.exit()
# your program ...
# ...
# exit program
try: os.close(fd) # (6)
except: pass
try: os.unlink('lock')
except: pass
sys.exit()