Python - передать переменную функции (обратного вызова) между функциями, выполняющимися в отдельных потоках

Я пытаюсь разработать скрипт Python 3.6, который использует модули pika и threading.

У меня есть проблема, которая, как мне кажется, вызвана тем, что я А) очень плохо знаком с Python и программированием в целом, и Б) не понимаю, как передавать переменные между функциями, когда они запускаются в отдельных потоках и уже передается параметр в скобках. в конце имени получающей функции.

Причина, по которой я так думаю, заключается в том, что, когда я не использую многопоточность, я могу передать переменную между функциями, просто вызвав имя принимающей функции и предоставив переменную для передачи, в скобках ниже приведен базовый пример:

def send_variable():
    body = "this is a text string"
    receive_variable(body)

def receive_variable(body):
    print(body)

Это при запуске печатает:

this is a text string

Ниже приведена рабочая версия кода, необходимого для работы с потоками - здесь используются прямые функции (без потоков), и я использую pika для получения сообщений из очереди (RabbitMQ) через функцию обратного вызова pika, затем я передаю тело сообщения, полученного в функции "обратного вызова" к "функции обработки":

import pika
...mq connection variables set here...


# defines username and password credentials as variables set at the top of this script
    credentials = pika.PlainCredentials(mq_user_name, mq_pass_word)

# defines mq server host, port and user credentials and creates a connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))

# creates a channel connection instance using the above settings
    channel = connection.channel()

# defines the queue name to be used with the above channel connection instance
    channel.queue_declare(queue=mq_queue)


def callback(ch, method, properties, body):

# passes (body) to processing function
    body_processing(body)

# sets channel consume type, also sets queue name/message acknowledge settings based on variables set at top of script
    channel.basic_consume(callback, queue=mq_queue, no_ack=mq_no_ack)
# tells the callback function to start consuming
    channel.start_consuming()
# calls the callback function to start receiving messages from mq server
    callback()
# above deals with pika connection and the main callback function


def body_processing(body):
    ...code to send a pika message every time a 'body' message is received...

Это прекрасно работает, однако я хочу перевести это для запуска в сценарии, который использует многопоточность. Когда я делаю это, я должен предоставить параметр 'channel' для имени функции, которая выполняется в своем собственном потоке - когда я затем пытаюсь включить параметр 'body', чтобы 'processing_function' выглядела так, как показано ниже:

def processing_function(channel, body):

Я получаю сообщение об ошибке:

[function_name] is missing 1 positional argument: 'body'

Я знаю, что при использовании многопоточности требуется больше кода, и я включил ниже фактический код, который я использую для многопоточности, чтобы вы могли видеть, что я делаю:

...imports and mq variables and pika connection details are set here...

def get_heartbeats(channel):
channel.queue_declare(queue=queue1)
#print (' [*] Waiting for messages. To exit press CTRL+C')

    def callback(ch, method, properties, body):

        process_body(body)

        #print (" Received %s" % (body))

    channel.basic_consume(callback, queue=queue1, no_ack=no_ack)
    channel.start_consuming()


def process_body(channel, body):
    channel.queue_declare(queue=queue2)
    #print (' [*] Waiting for Tick messages. To exit press CTRL+C')

# sets the mq host which pika client will use to send a message to
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host))
# create a channel connection instance
    channel = connection.channel()
# declare a queue to be used by the channel connection instance
    channel.queue_declare(queue=order_send_queue)
# send a message via the above channel connection settings
    channel.basic_publish(exchange='', routing_key=send_queue, body='Test Message')
# send a message via the above channel settings
# close the channel connection instance
    connection.close()


def manager():

# Channel 1 Connection Details - =======================================================================================

    credentials = pika.PlainCredentials(mq_user_name, mq_password)
    connection1 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
    channel1 = connection1.channel()

# Channel 1 thread =====================================================================================================
    t1 = threading.Thread(target=get_heartbeats, args=(channel1,))
    t1.daemon = True
    threads.append(t1)
    # as this is thread 1 call to start threading is made at start threading section

# Channel 2 Connection Details - =======================================================================================

    credentials = pika.PlainCredentials(mq_user_name, mq_password)
    connection2 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
    channel2 = connection2.channel()

# Channel 2 thread ====================================================================================================
    t2 = threading.Thread(target=process_body, args=(channel2, body))
    t2.daemon = True
    threads.append(t2)
    t2.start()  # as this is thread 2 - we need to start the thread here

# Start threading
t1.start()  # start the first thread - other threads will self start as they call t1.start() in their code block
for t in threads: # for all the threads defined
    t.join()  # join defined threads

manager()  # run the manager module which starts threads that call each module

Это при запуске выдает ошибку

process_body() missing 1 required positional argument: (body)

и я не понимаю, почему это или как это исправить.

Спасибо, что нашли время, чтобы прочитать этот вопрос, и любая помощь или совет, который вы можете предоставить, очень ценится.

Пожалуйста, имейте в виду, что я новичок в Python и кодировании, поэтому, возможно, мне понадобится что-то объяснить, а не понимать более загадочные ответы.

Спасибо!

1 ответ

Решение

При дальнейшем рассмотрении этого и игре с кодом кажется, что если я отредактирую строки:

def process_body(channel, body):

читать

def process_body(body):

а также

t2 = threading.Thread(target=process_body, args=(channel2, body))

так что это гласит:

t2 = threading.Thread(target=process_body)

тогда код, кажется, работает как нужно - я также вижу несколько процессов сценария в htop, поэтому кажется, что многопоточность работает - я оставил обработку сценария на 24 часа + и не получил никаких ошибок...

Другие вопросы по тегам