Как объединить результаты нескольких процессов, когда я использую telegraf и statsd?

У меня есть код, который запрашивает и запрашивает в определенной очереди. Теперь мне нужно, чтобы оба процесса выполняли определенные работы соответственно. то, что я хочу сделать, - это то, что у меня будет еще один процесс, который также будет выполнять запрос одновременно. Я хочу получить среднее число сообщений, которые были получены в течение определенного времени, и отправить его в influenxdb. Как мне этого добиться? это как-то связано со statsd или telegraf? где?

statsd_client = statsd.StatsClient(host="localhost", port=8125, 
prefix=None, maxudpsize=512, ipv6=False)


q = multiprocessing.Queue()


def queue_add_proc1():
    print("process 1 Id :", os.getpid())
    print("adding items to queue")
    x = 0
    upload_time = time.time()

    enque_count=0

    while x < 10000:

        curr_time = time.time()
        if curr_time - upload_time > 60:
            statsd_client.incr('enque_count_everyMinute', enque_count)
            statsd_client.incr('queue_size_enqueing', q.qsize())
            print("metric sent")
            enque_count = 0
            upload_time = curr_time
        q.put(x*2)

        print("added to queue")
        x =x+ 1
        enque_count+=1
        time.sleep(0.014)



def queue_pop_proc2():
    print("Process 2 ID :",os.getpid())
    print("popping values from queue")
    upload_time = time.time()
    deque_count = 0
    while not q.empty():
        curr_time = time.time()
        if curr_time - upload_time > 60:
            # upload dequed count
            statsd_client.incr('deque_count_everyMinute', deque_count)
            size1 = q.qsize()
            print("metric sent")
            deque_count = 0
            upload_time = curr_time
        print("dequeued")
        deque_count += 1
        time.sleep(0.03)

И я запускаю эти процессы в основном как обычно. Я попытался измениться, но это не имело смысла, это дает мне среднее значение каждого ряда.

[[aggregators.basicstats]]
## The period on which to flush & clear the aggregator.
period = "5s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false

## Configures which basic stats to push as fields
stats = ["mean"]

0 ответов

Другие вопросы по тегам