Revision a60088f6 py_logs_analizer.py

View differences:

py_logs_analizer.py
1 1
#!/usr/bin/python
2
from __future__ import division
2 3
import getopt,os,sys
3 4
import numpy as nm
4 5
from pandas import *
......
10 11
import process_manager as pmgr
11 12
from experiment import *
12 13
from peerstreamer_logs import *
14
from utilities import *
15

  
16

  
17
START_ANALYSIS_TIME = 60 #150
18
STOP_ANALYSIS_TIME = 120 #450
13 19

  
14 20
def rescale_log_time(logdata,offsetdata,server_addr):
21
	if (offsetdata is None) or 'addr' not in offsetdata.columns:
22
		return logdata
15 23
	if (len(set(offsetdata['addr']))) > 1:
16 24
		offsetdata = offsetdata[offsetdata['addr'] == server_addr]
17 25
	if len(logdata)>0:
......
27 35

  
28 36
			logdata['logTime'] = logdata['logTime'].apply(func)
29 37
		else:
30
			print ("[WARNING] time not rescaled for sliver ")
38
			warning(" time not rescaled for sliver ")
31 39

  
32 40
	return logdata
33 41

  
34 42

  
35 43
def dataPopulate(folder,exp):
36
	if source_hostname(folder):
37
		source = source_addr(folder)
38
		for logfile in os.listdir(folder):
39
			if logfile.endswith(".log.csv"):
40
				print "Loading file: "+logfile
41
				session = exp.getSession(time_from_filename(logfile))
44
	# create the sessions with the sources
45
	for logfile in os.listdir(folder):
46
			if logfile.endswith(".log.csv") and is_source_from_filename(logfile):
47
				print "Loading source file: "+logfile
48
				session = exp.getSession(time_from_filename(logfile),creation=True)
42 49
				peer_data = read_csv(folder+"/"+logfile)
43
#				if len(peer_data) > 0:
44
				offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
50
				try:
51
					offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
52
				except IOError:
53
					offset_data = None
45 54

  
46 55
				peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
47
						rescale_log_time(peer_data,offset_data,source))
48
#						peer_data)
56
						rescale_log_time(peer_data,offset_data,hostname_from_filename(logfile)))
49 57
				session.addPeer(peer)
50 58

  
59
# add the peers
60
	for logfile in os.listdir(folder):
61
		if logfile.endswith(".log.csv") and not is_source_from_filename(logfile):
62
			print "Loading generic peer file: "+logfile
63
			session = exp.getSession(time_from_filename(logfile))
64
                        if session is not None:
65
                            source = session.getSource()
66
                            peer_data = read_csv(folder+"/"+logfile)
67
                            try:
68
                                    offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
69
                            except IOError:
70
                                    offset_data = None
71

  
72
                            peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
73
                                            rescale_log_time(peer_data,offset_data,source))
74
                            session.addPeer(peer)
75

  
76
# prune deviating sessions
77
	for session in exp.sessions:
78
		for peer in session.peers:
79
                    rtt_file = rtt_filename(folder,peer)
80
                    if rtt_file is not None:
81
			ping_data = read_csv(rtt_file)
82
			ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + START_ANALYSIS_TIME)) & \
83
					(ping_data['unixtime'] <= (session.time + STOP_ANALYSIS_TIME))]
84
			ping_sent = ping_data['sent'].sum()
85
			echo_received = ping_data['answers'].sum()
86
                        if ping_sent==0:
87
                            warning("seems that peer "+peer.hostname+" has no ICMP data")
88
			if ping_sent!=0 and ((echo_received/ping_sent) < 0.7):
89
				exp.removeSessionByTime(session.time)
90
                                print ping_sent
91
                                print echo_received
92
				warning( "REMOVED SESSION "+ str(session.time))
93
                                warning (" because of "+rtt_file)
94
                    else:
95
                        warning("peer "+peer.hostname+" has not ICMP file")
96

  
97
def ICMPLossAnalyse(folder,exp,begin_sec,end_sec):
98
    out_file = open(folder+"/ICMPloss.exp","w")
99
    out_file.write("session,peer,echo_request,echo_response\n")
100
    for session in exp.sessions:
101
        for peer in session.peers:
102
            rtt_file = rtt_filename(folder,peer)
103
            if rtt_file is not None:
104
                ping_data = read_csv(rtt_file)
105
                ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + begin_sec)) & \
106
                                (ping_data['unixtime'] <= (session.time + end_sec))]
107
                ping_sent = ping_data['sent'].sum()
108
                echo_received = ping_data['answers'].sum()
109
                out_file.write(str(session.time)+","+str(purify_hostname(peer.hostname))+","+str(ping_sent)+","+str(echo_received)+"\n")
110
    out_file.close()
111

  
112
def rtt_filename(folder,peer):
113
	for logfile in os.listdir(folder):
114
		if logfile.startswith(peer.hostname+"_rtt_") and logfile.endswith(".csv"):
115
			return folder+'/'+logfile
116

  
51 117
def purify(delim,line):
52 118
	if(line.count(delim)>1):
53
		print "[WARNING] "+ str(line.count(delim)-1) +" incomplete records."
119
		warning( str(line.count(delim)-1) +" incomplete records.")
54 120
	return delim+line.split(delim)[-1]
55 121

  
56 122
def preproc_csv_file(csvfile):
......
132 198
				received=records[records['action'] == 'RECEIVED']
133 199
				minimum=(received['logTime']-received['chunkTimestamp']).min()
134 200
				if (minimum < -5000): 
135
					print "[WARNING] Timestamps seems to be not consistent!" # -5msec
201
					warning(" Timestamps seems to be not consistent!") # -5msec
136 202
					print "          Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds"
137 203
				mean=(received['logTime']-received['chunkTimestamp']).mean()
138 204
				out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n")
......
140 206
	out_file.close()
141 207

  
142 208
def sessionLossAnalyse(folder,session,begin_sec,end_sec):
143
	delta = (end_sec-begin_sec)/100 # time resolution in seconds
209
	delta = int((end_sec-begin_sec)/100) # time resolution in seconds
210
        if delta == 0:
211
            delta = 1
144 212
	if (session.getSource() != None):
145 213
		out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w")
146 214
		out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n")
......
203 271
		if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: 
204 272
			print "RTT analysing: "+logfile
205 273
			data = read_csv(folder + "/" + logfile)
206
			data = data[data['rtt'] != -1]
274
			data = data[data['rtt'] != -1].reindex(range(0,len(data)))
207 275
			if len(data)>0:
208
				sliver = purify_hostname(data['hostname'][0])
276
				sliver = purify_hostname(hostname_from_filename(logfile))#data['hostname'][0])
209 277
				avgs = data.groupby("addr").mean()
210 278
				maxs = data.groupby("addr").max()
211 279

  
......
227 295
def hopsAnalyse(folder,exp,begin_sec,end_sec):
228 296
	out_file = open(folder+"/packets_hops.exp","w")
229 297
	out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n")
230
	delta = (end_sec-begin_sec)/100 # time resolution in seconds
298
	delta = int((end_sec-begin_sec)/100) # time resolution in seconds
299
        if delta == 0:
300
            delta = 1
231 301
	for session in exp.sessions:
232 302
		if (session.getSource() != None):
233 303
			for peer in session.peers:
......
262 332
					out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\
263 333
							","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n")
264 334
		else:
265
			print "[WARNING] source not found"
335
			warning("source not found for session " + str(session.time))
266 336

  
267 337
	out_file.close()
268 338

  
......
281 351
	out_file.close()
282 352
	
283 353

  
284
def main(argv):
354
def analyze_main(argv):
285 355
	try:
286 356
		opts,args = getopt.getopt(argv,"hf:",["help","folder"])
287 357
	except getopt.GetoptError:
......
301 371
	pm = pmgr.ProcessManager()
302 372
	preproc_chunklog(folder)
303 373
	preproc_csv(folder,procman=pm)
304
	preproc_csv(folder)
305 374
	pm.joinAll()
306
#
375

  
307 376
	dataPopulate(folder,exp)
308 377
	pm.launchProcess(rttAnalyse,[folder,exp])
309
	pm.launchProcess(lossAnalyse,[folder,exp,150,450])
310
	pm.launchProcess(delayAnalyse,[folder,exp,150,450])
311
	pm.launchProcess(receptionAnalyse,[folder,exp,150,450])
312
	pm.launchProcess(hopsAnalyse,[folder,exp,150,450])
378
	pm.launchProcess(lossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
379
	pm.launchProcess(ICMPLossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
380
	pm.launchProcess(delayAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
381
	pm.launchProcess(receptionAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
382
	pm.launchProcess(hopsAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
313 383
	pm.joinAll()
314 384
	dumpHostnameMap(folder,exp)
315 385

  
......
317 387
		print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers"
318 388

  
319 389
if __name__ == "__main__":
320
	main(sys.argv[1:])	
390
	analyze_main(sys.argv[1:])	

Also available in: Unified diff