defmain(): '''Consume data and react''' # Check authorities assert AVAILABLE_TOPICS <= CONSUMER.topics(), 'Please contact admin'
submit([['docker_003', 'container_cpu_used']]) i = 0 for message in CONSUMER: i += 1 data = json.loads(message.value.decode('utf8')) if message.topic == 'platform-index': # data['body'].keys() is supposed to be # ['os_linux', 'db_oracle_11g', 'mw_redis', 'mw_activemq', # 'dcos_container', 'dcos_docker'] data = { 'timestamp': data['timestamp'], 'body': { stack: [PlatformIndex(item) for item in data['body'][stack]] for stack in data['body'] }, } timestamp = data['timestamp'] for stack in data['body']: print(i, message.topic, timestamp, stack) for item in data['body'][stack]: # like class print (item.name)
elif message.topic == 'business-index': # data['body'].keys() is supposed to be ['esb', ] data = { 'startTime': data['startTime'], 'body': { key: [BusinessIndex(item) for item in data['body'][key]] for key in data['body'] }, } timestamp = data['startTime'] for key in data['body']: print (i, message.topic, timestamp, key) # data['body'][key] is a class for item in data['body'][key]: print (item.service_name) else: # message.topic == 'trace' data = { 'startTime': data['startTime'], 'body': Trace(data), } timestamp = data['startTime'] # one data print(i, message.topic, timestamp, data['body'].pid)