Доступ к пулу соединений MySQL из многопроцессорной обработки Python

Я пытаюсь настроить пул соединений MySQL, и мои рабочие процессы получают доступ к уже установленному пулу вместо того, чтобы каждый раз устанавливать новое соединение.

Я в замешательстве, если я должен передать курсор базы данных для каждого процесса, или если есть какой-то другой способ сделать это? Разве MySql.connector не должен делать пул автоматически? Когда я проверяю свои файлы журнала, многие, многие соединения открываются и закрываются... по одному для каждого процесса.

Мой код выглядит примерно так:

PATH = "/tmp"

class DB(object):
  def __init__(self):
    connected = False
    while not connected:
      try:
        cnxpool = mysql.connector.pooling.MySQLConnectionPool(pool_name = "pool1",
                                                          **config.dbconfig)
        self.__cnx = cnxpool.get_connection()
      except mysql.connector.errors.PoolError:
        print("Sleeping.. (Pool Error)")
        sleep(5)
      except mysql.connector.errors.DatabaseError:
        print("Sleeping.. (Database Error)")
        sleep(5)

    self.__cur = self.__cnx.cursor(cursor_class=MySQLCursorDict)

  def execute(self, query):
    return self.__cur.execute(query)

def isValidFile(self, name):
  return True

def readfile(self, fname):
  d = DB()
  d.execute("""INSERT INTO users (first_name) VALUES ('michael')""")

def main():
  queue = multiprocessing.Queue()
  pool = multiprocessing.Pool(None, init, [queue])
  for dirpath, dirnames, filenames in os.walk(PATH):

    full_path_fnames = map(lambda fn: os.path.join(dirpath, fn),
                           filenames)
    full_path_fnames = filter(is_valid_file, full_path_fnames)
    pool.map(readFile, full_path_fnames)

if __name__ == '__main__':
  sys.exit(main())

4 ответа

Во-первых, вы создаете отдельный пул соединений для каждого экземпляра вашего DB учебный класс. Пулы с одинаковыми именами не делают их одинаковыми

Из документации:

Не является ошибкой для нескольких пулов иметь одинаковое имя. Приложение, которое должно различать пулы по их pool_name свойство должно создавать каждый пул с отдельным именем.

Кроме того, совместное использование соединения с базой данных (или пула соединений) между различными процессами было бы плохой идеей (и я очень сомневаюсь, что оно даже будет работать правильно), поэтому каждый процесс, использующий свои собственные соединения, на самом деле является тем, к чему вы должны стремиться.

Вы можете просто инициализировать пул в вашем init инициализатор в качестве глобальной переменной и использовать его вместо этого.
Очень простой пример:

from multiprocessing import Pool
from mysql.connector.pooling import MySQLConnectionPool
from mysql.connector import connect
import os

pool = None

def init():
    global pool
    print("PID %d: initializing pool..." % os.getpid())
    pool = MySQLConnectionPool(...)

def do_work(q):
    con = pool.get_connection()
    print("PID %d: using connection %s" % (os.getpid(), con))
    c = con.cursor()
    c.execute(q)
    res = c.fetchall()
    con.close()
    return res

def main():
    p = Pool(initializer=init)
    for res in p.map(do_work, ['select * from test']*8):
        print(res)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

Или просто используйте простое соединение вместо пула соединений, так как в любом случае одновременно будет активным только одно соединение в каждом процессе.
Количество одновременно используемых соединений неявно ограничено размером multiprocessing.Pool,

#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import mysql.connector.pooling


dbconfig = {
    "host":"127.0.0.1",
    "port":"3306",
    "user":"root",
    "password":"123456",
    "database":"test",
}


class MySQLPool(object):
    """
    create a pool when connect mysql, which will decrease the time spent in 
    request connection, create connection and close connection.
    """
    def __init__(self, host="172.0.0.1", port="3306", user="root",
                 password="123456", database="test", pool_name="mypool",
                 pool_size=3):
        res = {}
        self._host = host
        self._port = port
        self._user = user
        self._password = password
        self._database = database

        res["host"] = self._host
        res["port"] = self._port
        res["user"] = self._user
        res["password"] = self._password
        res["database"] = self._database
        self.dbconfig = res
        self.pool = self.create_pool(pool_name=pool_name, pool_size=pool_size)

    def create_pool(self, pool_name="mypool", pool_size=3):
        """
        Create a connection pool, after created, the request of connecting 
        MySQL could get a connection from this pool instead of request to 
        create a connection.
        :param pool_name: the name of pool, default is "mypool"
        :param pool_size: the size of pool, default is 3
        :return: connection pool
        """
        pool = mysql.connector.pooling.MySQLConnectionPool(
            pool_name=pool_name,
            pool_size=pool_size,
            pool_reset_session=True,
            **self.dbconfig)
        return pool

    def close(self, conn, cursor):
        """
        A method used to close connection of mysql.
        :param conn: 
        :param cursor: 
        :return: 
        """
        cursor.close()
        conn.close()

    def execute(self, sql, args=None, commit=False):
        """
        Execute a sql, it could be with args and with out args. The usage is 
        similar with execute() function in module pymysql.
        :param sql: sql clause
        :param args: args need by sql clause
        :param commit: whether to commit
        :return: if commit, return None, else, return result
        """
        # get connection form connection pool instead of create one.
        conn = self.pool.get_connection()
        cursor = conn.cursor()
        if args:
            cursor.execute(sql, args)
        else:
            cursor.execute(sql)
        if commit is True:
            conn.commit()
            self.close(conn, cursor)
            return None
        else:
            res = cursor.fetchall()
            self.close(conn, cursor)
            return res

    def executemany(self, sql, args, commit=False):
        """
        Execute with many args. Similar with executemany() function in pymysql.
        args should be a sequence.
        :param sql: sql clause
        :param args: args
        :param commit: commit or not.
        :return: if commit, return None, else, return result
        """
        # get connection form connection pool instead of create one.
        conn = self.pool.get_connection()
        cursor = conn.cursor()
        cursor.executemany(sql, args)
        if commit is True:
            conn.commit()
            self.close(conn, cursor)
            return None
        else:
            res = cursor.fetchall()
            self.close(conn, cursor)
            return res


if __name__ == "__main__":
    mysql_pool = MySQLPool(**dbconfig)
    sql = "select * from store WHERE create_time < '2017-06-02'"
    p = Pool()
    for i in range(5):
        p.apply_async(mysql_pool.execute, args=(sql,))

Приведенный выше код создает пул соединений в начале и получает от него соединения в execute()После того, как пул соединений был создан, работа заключается в том, чтобы сохранить его, поскольку пул создается только один раз, и это сэкономит время для запроса соединения каждый раз, когда вы хотите подключиться к MySQL. Надеюсь, поможет!

Могут возникнуть проблемы с синхронизацией, если вы собираетесь повторно использовать экземпляры, поддерживаемые пулом, но просто MySQLConnectionPoolэкземпляр между рабочими процессами и использование соединений, полученных путем вызова метода get_connection()было бы хорошо, потому что для каждого будет создан отдельный сокет MySQLConnectionпример.

      import multiprocessing
from mysql.connector import pooling

def f(cnxpool: pooling.MySQLConnectionPool) -> None:
    # Dedicate connection instance for each worker process.
    cnx = cnxpool.get_connection()
    ...

if __name__ == '__main__':
    cnxpool = pooling.MySQLConnectionPool(
        pool_name='pool',
        pool_size=2,
    )
    p0 = multiprocessing.Process(target=f, args=(cnxpool,))
    p1 = multiprocessing.Process(target=f, args=(cnxpool,))
    p0.start()
    p1.start()

Вы создали несколько экземпляров объекта БД. В mysql.connector.pooling.py pool_name является только атрибутом, позволяющим вам определить, какой это пул. В пуле mysql нет сопоставлений.

Итак, вы создаете несколько экземпляров БД в def readfile()тогда у вас будет несколько пулов соединений.

Синглтон полезен в этом случае.

(Я потратил несколько часов, чтобы выяснить это. В среде Tornado каждый http get создает новый обработчик, который приводит к созданию нового соединения.)

Другие вопросы по тегам