Как объединить результаты нескольких процессов, когда я использую telegraf и statsd?
У меня есть код, который запрашивает и запрашивает в определенной очереди. Теперь мне нужно, чтобы оба процесса выполняли определенные работы соответственно. то, что я хочу сделать, - это то, что у меня будет еще один процесс, который также будет выполнять запрос одновременно. Я хочу получить среднее число сообщений, которые были получены в течение определенного времени, и отправить его в influenxdb. Как мне этого добиться? это как-то связано со statsd или telegraf? где?
statsd_client = statsd.StatsClient(host="localhost", port=8125,
prefix=None, maxudpsize=512, ipv6=False)
q = multiprocessing.Queue()
def queue_add_proc1():
print("process 1 Id :", os.getpid())
print("adding items to queue")
x = 0
upload_time = time.time()
enque_count=0
while x < 10000:
curr_time = time.time()
if curr_time - upload_time > 60:
statsd_client.incr('enque_count_everyMinute', enque_count)
statsd_client.incr('queue_size_enqueing', q.qsize())
print("metric sent")
enque_count = 0
upload_time = curr_time
q.put(x*2)
print("added to queue")
x =x+ 1
enque_count+=1
time.sleep(0.014)
def queue_pop_proc2():
print("Process 2 ID :",os.getpid())
print("popping values from queue")
upload_time = time.time()
deque_count = 0
while not q.empty():
curr_time = time.time()
if curr_time - upload_time > 60:
# upload dequed count
statsd_client.incr('deque_count_everyMinute', deque_count)
size1 = q.qsize()
print("metric sent")
deque_count = 0
upload_time = curr_time
print("dequeued")
deque_count += 1
time.sleep(0.03)
И я запускаю эти процессы в основном как обычно. Я попытался измениться, но это не имело смысла, это дает мне среднее значение каждого ряда.
[[aggregators.basicstats]]
## The period on which to flush & clear the aggregator.
period = "5s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## Configures which basic stats to push as fields
stats = ["mean"]