Statistics
| Branch: | Revision:

## peerstreamer-logs-analyzer / py_logs_analizer.py @ 1a481d7b

1 2 3 698b8685 Luca ```#!/usr/bin/python ``` ```import getopt,os,sys ``` ```import numpy as nm ``` ```from pandas import * ``` 05ddd673 Luca Baldesi ```from random import randrange ``` ee19f27d Luca Baldesi ```import scipy.interpolate ``` be87c355 Luca Baldesi ```import shutil as sh ``` 698b8685 Luca 878b6653 Luca ```sys.path.insert(0,'lib') ``` ```import process_manager as pmgr ``` 8b4d058a Luca Baldesi ```from experiment import * ``` ```from peerstreamer_logs import * ``` 3294e8f4 Luca ```def rescale_log_time(logdata,offsetdata,server_addr): ``` 633f4d39 Luca Baldesi ``` if (len(set(offsetdata['addr']))) > 1: ``` ``` offsetdata = offsetdata[offsetdata['addr'] == server_addr] ``` ``` if len(logdata)>0: ``` ``` offsetdata = offsetdata.set_index('unixtime') ``` ``` offsetdata = offsetdata[offsetdata['offset'] != -1] ``` ``` future = pow(10,20) ``` ``` if (len(offsetdata) > 0): ``` ``` x = [0]+list(offsetdata.index)+[future] ``` ``` y = [i*1000000 for i in ([0]+list(offsetdata['offset'])+[0])] # list of offset to apply to each time in x ``` ``` y = [sum(i) for i in zip(x,y)] # correct value of time for each given time in x ``` ``` func = scipy.interpolate.interp1d(x,y) ``` ``` logdata['logTime'] = logdata['logTime'].apply(func) ``` ``` else: ``` ``` print ("[WARNING] time not rescaled for sliver ") ``` 3294e8f4 Luca ``` return logdata ``` ```def dataPopulate(folder,exp): ``` ``` if source_hostname(folder): ``` 633f4d39 Luca Baldesi ``` source = source_addr(folder) ``` 3294e8f4 Luca ``` for logfile in os.listdir(folder): ``` ``` if logfile.endswith(".log.csv"): ``` ``` print "Loading file: "+logfile ``` ``` session = exp.getSession(time_from_filename(logfile)) ``` ``` peer_data = read_csv(folder+"/"+logfile) ``` 633f4d39 Luca Baldesi ```# if len(peer_data) > 0: ``` 3294e8f4 Luca ``` offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile))) ``` ``` peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \ ``` ``` rescale_log_time(peer_data,offset_data,source)) ``` ee19f27d Luca Baldesi ```# peer_data) ``` 3294e8f4 Luca ``` session.addPeer(peer) ``` 698b8685 Luca ```def purify(delim,line): ``` ``` if(line.count(delim)>1): ``` ``` print "[WARNING] "+ str(line.count(delim)-1) +" incomplete records." ``` ``` return delim+line.split(delim)[-1] ``` 05ddd673 Luca Baldesi ```def preproc_csv_file(csvfile): ``` ``` tmpfile="/tmp/tmpcsv"+str(randrange(99999999999999999)) ``` be87c355 Luca Baldesi ``` csvfile = os.path.abspath(csvfile) ``` 05ddd673 Luca Baldesi ``` infile = open(csvfile,"r") ``` ``` outfile = open(tmpfile,"w") ``` ``` field_n = 0 ``` ``` modified = False ``` ``` for in_line in infile.xreadlines(): ``` ``` if len(in_line.split(',')) != field_n: ``` ``` if field_n == 0: ``` ``` field_n = len(in_line.split(',')) ``` ``` outfile.write(in_line) ``` ``` else: ``` be87c355 Luca Baldesi ```# print "[WARNING] Non conformant line:\n\t"+in_line ``` 05ddd673 Luca Baldesi ``` modified = True ``` ``` else: ``` ``` outfile.write(in_line) ``` ``` infile.close() ``` ``` outfile.close() ``` ``` if modified: ``` be87c355 Luca Baldesi ``` sh.move(tmpfile,csvfile) ``` ```# os.renames(tmpfile,csvfile) ``` 698b8685 Luca 05ddd673 Luca Baldesi ```def preproc_csv(folder,procman=None): ``` 3294e8f4 Luca ``` for csvfile in os.listdir(folder): ``` ``` if csvfile.endswith(".csv"): ``` ``` print "Preprocessing CSV file: "+csvfile ``` 05ddd673 Luca Baldesi ``` if procman: ``` ``` procman.launchProcess(preproc_csv_file,[folder+"/"+csvfile]) ``` ``` else: ``` ``` preproc_csv_file(folder+"/"+csvfile) ``` 3294e8f4 Luca ```def preproc_chunklog(folder): ``` 698b8685 Luca ``` for logfile in os.listdir(folder): ``` ``` if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"): ``` ``` print "Preprocessing file: "+logfile ``` ``` infile = open(folder+"/"+logfile,"r") ``` ``` csvfile = open(folder+"/"+logfile+".csv","w") ``` ``` csvfile.write("CHUNK_LOG,logTime,sender"+\ ``` ``` ",receiver,chunkID,size[bytes],chunkTimestamp,hopcount,action\n") ``` ``` for in_line in infile.xreadlines(): ``` ``` if "CHUNK_LOG" in in_line[:10]: ``` ``` csvfile.write(purify("[CHUNK_LOG]",in_line)) ``` 3294e8f4 Luca ``` infile.close() ``` ``` csvfile.close() ``` 698b8685 Luca ```def delaySessionAnalyse(folder,session,begin_sec,end_sec): ``` ``` if (session.getSource() != None): ``` ``` out_file = open(folder+"/"+str(session.time)+"_session_delay.exp","w") ``` 633f4d39 Luca Baldesi ``` out_file.write("info_type,session_id,peer_hostname,delay,hops\n") ``` 698b8685 Luca ``` for peer in session.peers: ``` ``` logs = peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000) ``` ``` received = logs[logs['action'] == "RECEIVED"] ``` 633f4d39 Luca Baldesi ``` for i in received.index: ``` ``` row = received.ix[i] ``` ``` out_file.write("DELAY,"+str(session.time)+","\ ``` ``` +purify_hostname(peer.hostname)+","\ ``` ``` +str(row['logTime'] - row['chunkTimestamp'])+","\ ``` ``` +str(row['hopcount'])+"\n") ``` ```# for delay in (received['logTime']-received['chunkTimestamp']): ``` ```# out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(delay)+"\n") ``` 698b8685 Luca ``` ``` ``` out_file.close() ``` ```def delayAnalyse(folder,exp,begin_sec,end_sec): ``` ``` out_file = open(folder+"/packets_delay.exp","w") ``` ``` out_file.write("info_type,session_id,peer_hostname,avg_delay\n") ``` ``` for session in exp.sessions: ``` ``` delaySessionAnalyse(folder,session,begin_sec,end_sec) ``` ``` for peer in session.peers: ``` ``` if not peer.is_source: ``` ``` records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000) ``` ``` received=records[records['action'] == 'RECEIVED'] ``` ``` minimum=(received['logTime']-received['chunkTimestamp']).min() ``` ``` if (minimum < -5000): ``` ``` print "[WARNING] Timestamps seems to be not consistent!" # -5msec ``` ``` print " Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds" ``` ``` mean=(received['logTime']-received['chunkTimestamp']).mean() ``` ``` out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n") ``` ``` out_file.close() ``` ```def sessionLossAnalyse(folder,session,begin_sec,end_sec): ``` ``` delta = (end_sec-begin_sec)/100 # time resolution in seconds ``` ``` if (session.getSource() != None): ``` ``` out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w") ``` ``` out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n") ``` ``` source_chunks = session.getSource().published_interval(\ ``` ``` (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000) ``` ``` for peer in session.peers: ``` ``` old_chunks = set() ``` ``` peer_chunks_ids = set(peer.published['chunkID']) ``` ``` for instant in range(session.time+begin_sec,session.time+end_sec,delta): ``` ``` delta_chunks = set(source_chunks[source_chunks['logTime'] <= (instant*1000000)]['chunkID']).difference(old_chunks) ``` ``` n_source_chunks = len(delta_chunks) ``` ``` n_lost_chunks = len(delta_chunks.difference(peer_chunks_ids)) ``` ``` out_file.write("LOSS,"+str(session.time)+","+str(instant)+","+\ ``` ``` str(n_source_chunks)+","+purify_hostname(peer.hostname)+","+str(n_lost_chunks)+"\n") ``` ``` old_chunks = old_chunks.union(delta_chunks) ``` ``` ``` ``` out_file.close() ``` ```def hostnames2key(names): ``` ``` return '-'.join(names) ``` ```def key2hostnames(key): ``` ``` return key.split('-') ``` ```def receptionAnalyse(folder,exp,begin_sec,end_sec): ``` ``` out_file = open(folder+"/edges.exp","w") ``` ``` out_file.write("info_type,session_id,peer_sender,peer_receiver,weight\n") ``` ``` for session in exp.sessions: ``` ``` edges = {} ``` ``` for peer in session.peers: ``` ``` records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000) ``` ``` records=records[records['action'] == "RECEIVED"] ``` ``` sender=records['sender'] ``` ``` receiver=records['receiver'] ``` ``` for rec in sender.iteritems(): #range(0,sender.size): ``` ``` i = rec[0] ``` ``` if hostnames2key([sender[i],receiver[i]]) in edges.keys(): ``` ``` edges[hostnames2key([sender[i],receiver[i]])] += 1 ``` ``` else: ``` ``` edges[hostnames2key([sender[i],receiver[i]])] = 1 ``` ``` for edge in edges.keys(): ``` 1a481d7b Luca Baldesi ```# print edge ``` ``` host1 = exp.addr2hostname(key2hostnames(edge)[0]) ``` ``` host2 = exp.addr2hostname(key2hostnames(edge)[1]) ``` ```# print str(host1) + ' ' + str(host2) ``` 698b8685 Luca ``` out_file.write("EDGE,"+str(session.time)+","+ \ ``` 1a481d7b Luca Baldesi ``` purify_hostname(host1)+","+ \ ``` ``` purify_hostname(host2)+","+ \ ``` 698b8685 Luca ``` str(edges[edge])+"\n") ``` ``` out_file.close() ``` 633f4d39 Luca Baldesi ```def rttAnalyse(folder,exp): ``` 53ccb966 Luca ``` out_file = open(folder + "/slivers_rtts.exp","w") ``` 05ddd673 Luca Baldesi ``` out_file.write("RTT_TYPE,SRC,DST,MSEC,MDEV\n") ``` 53ccb966 Luca ``` for logfile in os.listdir(folder): ``` 05ddd673 Luca Baldesi ``` if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: ``` 53ccb966 Luca ``` print "RTT analysing: "+logfile ``` ``` data = read_csv(folder + "/" + logfile) ``` ``` data = data[data['rtt'] != -1] ``` 633f4d39 Luca Baldesi ``` if len(data)>0: ``` ``` sliver = purify_hostname(data['hostname'][0]) ``` ``` avgs = data.groupby("addr").mean() ``` ``` maxs = data.groupby("addr").max() ``` ``` for i in range(0,len(avgs)): ``` 1a481d7b Luca Baldesi ``` dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True)) ``` 633f4d39 Luca Baldesi ``` rtt = avgs.ix[i]['rtt'] ``` ``` mdev = avgs.ix[i]['mdev'] ``` ``` out_file.write("RTT_AVG,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n") ``` ``` for i in range(0,len(maxs)): ``` 1a481d7b Luca Baldesi ``` dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True)) ``` 633f4d39 Luca Baldesi ``` rtt = maxs.ix[i]['rtt'] ``` ``` mdev = maxs.ix[i]['mdev'] ``` ``` out_file.write("RTT_MAX,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n") ``` 53ccb966 Luca ``` out_file.close() ``` 698b8685 Luca ```def hopsAnalyse(folder,exp,begin_sec,end_sec): ``` ``` out_file = open(folder+"/packets_hops.exp","w") ``` ``` out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n") ``` ``` delta = (end_sec-begin_sec)/100 # time resolution in seconds ``` ``` for session in exp.sessions: ``` ``` if (session.getSource() != None): ``` ``` for peer in session.peers: ``` ``` for instant in range(session.time+begin_sec,session.time+end_sec,delta): ``` ``` delta_hops = peer.published_interval_sec(instant,instant+delta)['hopcount'] ``` ``` out_file.write("HOP,"+str(session.time)+","+str(instant+delta)+","+str(delta_hops.count())+","+\ ``` ``` peer.hostname+","+str(delta_hops.mean())+"\n") ``` ``` else: ``` ``` warning ("No source for session "+str(session.time)) ``` ``` out_file.close() ``` ```def lossAnalyse(folder,exp,begin_sec,end_sec): ``` ``` out_file = open(folder+"/packets_loss.exp","w") ``` ``` out_file.write("info_type,session_id,chunks,peer_hostname,losts\n") ``` ``` for session in exp.sessions: ``` ``` sessionLossAnalyse(folder,session,begin_sec,end_sec) ``` ``` print "Session: "+str(session.time) ``` ``` if (session.getSource() != None): ``` ``` source_ids = set(session.getSource().published_interval(\ ``` ``` (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)\ ``` ``` ['chunkID']) ``` ``` for peer in session.peers: ``` ``` if not peer.is_source: ``` ``` peer_ids = set(peer.published['chunkID'])#_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)['chunkID']) ``` ``` print "Peer "+peer.hostname+" lost "+\ ``` ``` str(len(source_ids.difference(peer_ids)))+\ ``` ``` "/"+str(len(source_ids))+" packets" ``` ``` out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\ ``` ``` ","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n") ``` ``` else: ``` ``` print "[WARNING] source not found" ``` ``` out_file.close() ``` 633f4d39 Luca Baldesi ```def dumpHostnameMap(folder,exp): ``` ``` hostnames = {} ``` ``` for session in exp.sessions: ``` ``` for peer in session.peers: ``` ``` hostnames[peer.hostname] = peer.address ``` ``` out_file = open(folder+"/hostnames2address.exp","w") ``` ``` out_file.write("info_type,hostname,address\n") ``` ``` for key in hostnames.keys(): ``` ``` out_file.write("MAPPING,"+key+","+hostnames[key]+"\n") ``` ``` out_file.close() ``` ``` ``` 698b8685 Luca ```def main(argv): ``` ``` try: ``` ``` opts,args = getopt.getopt(argv,"hf:",["help","folder"]) ``` ``` except getopt.GetoptError: ``` ``` sys.exit(2) ``` ``` for opt,arg in opts: ``` ``` if opt in ("-h","--help"): ``` ``` sys.exit() ``` ``` elif opt in ("-f","--folder"): ``` ``` folder = arg ``` ``` ``` ``` try: ``` ``` folder ``` ``` except NameError: ``` ``` sys.exit() ``` ``` print "folder is " + folder ``` ``` exp = Experiment() ``` 878b6653 Luca ``` pm = pmgr.ProcessManager() ``` 3294e8f4 Luca ``` preproc_chunklog(folder) ``` 05ddd673 Luca Baldesi ``` preproc_csv(folder,procman=pm) ``` 633f4d39 Luca Baldesi ``` preproc_csv(folder) ``` 05ddd673 Luca Baldesi ``` pm.joinAll() ``` 633f4d39 Luca Baldesi ```# ``` 698b8685 Luca ``` dataPopulate(folder,exp) ``` 633f4d39 Luca Baldesi ``` pm.launchProcess(rttAnalyse,[folder,exp]) ``` 878b6653 Luca ``` pm.launchProcess(lossAnalyse,[folder,exp,150,450]) ``` ``` pm.launchProcess(delayAnalyse,[folder,exp,150,450]) ``` ``` pm.launchProcess(receptionAnalyse,[folder,exp,150,450]) ``` ``` pm.launchProcess(hopsAnalyse,[folder,exp,150,450]) ``` ``` pm.joinAll() ``` 633f4d39 Luca Baldesi ``` dumpHostnameMap(folder,exp) ``` 878b6653 Luca 698b8685 Luca ``` for session in exp.sessions: ``` ``` print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers" ``` ```if __name__ == "__main__": ``` ` main(sys.argv[1:]) `