Revision c0eac366

View differences:

lib/experiment.py
30 30
		#print "hostname for addr "+addr+": "+str(hostname)
31 31
		return hostname
32 32

  
33
	def getPeer(self,hostname):
34
		for session in self.sessions:
35
				peer = session.getPeer(hostname)
36
				if peer:
37
						return peer
38
		return None
39

  
33 40
class Session:
34 41
	def __init__(self,time):
35 42
		self.time=int(time)
......
66 73
		self.hostname = hostname
67 74
		self.is_source = source
68 75
		self.published = dataframe
76
		self.neighbours = None
69 77
		received = dataframe[dataframe['action'] == 'RECEIVED']
70 78
		sent = dataframe[dataframe['action'] == 'SENT']
71 79
		if len(received) > 0:
72
			self.address = ''.join(received['receiver'][0])
80
			i = received.first_valid_index()
81
			self.address = ''.join(received['receiver'][i])
73 82
		else:
74 83
			if len(sent) > 0:
75
				self.address = ''.join(sent['sender'][0])
84
				i = sent.first_valid_index()
85
				self.address = ''.join(sent['sender'][i])
76 86
			else:
77 87
				self.address = 'unkwnon'
78 88

  
89
	def setNeighbours(self,neighs):
90
		self.neighbours = neighs
91

  
79 92
	def published_interval(self,begin_time,end_time):
80 93
		return self.published[(self.published['logTime'] > begin_time) & \
81 94
				      						 (self.published['logTime'] < end_time)]
......
84 97
		return self.published[(self.published['logTime'] > (begin_time*1000000)) & \
85 98
				      						 (self.published['logTime'] < (end_time*1000000))]
86 99

  
100
	def neighbours_interval_sec(self,begin_time,end_time):
101
		return self.neighbours[(self.neighbours['logTime'] > (begin_time*1000000)) & \
102
			(self.neighbours['logTime'] < (end_time*1000000))]
py_logs_analizer.py
6 6
from random import randrange
7 7
import scipy.interpolate
8 8
import shutil as sh
9
from collections import Counter
9 10

  
10 11
sys.path.insert(0,'lib')
11 12
import process_manager as pmgr
......
14 15
from utilities import *
15 16

  
16 17

  
17
START_ANALYSIS_TIME = 60 #150
18
STOP_ANALYSIS_TIME = 120 #450
18
START_ANALYSIS_TIME = 150
19
STOP_ANALYSIS_TIME = 450
19 20

  
20 21
def rescale_log_time(logdata,offsetdata,server_addr):
21
	if (offsetdata is None) or 'addr' not in offsetdata.columns:
22
		return logdata
23
	if (len(set(offsetdata['addr']))) > 1:
24
		offsetdata = offsetdata[offsetdata['addr'] == server_addr]
25
	if len(logdata)>0:
26
		offsetdata = offsetdata.set_index('unixtime')
27
		offsetdata = offsetdata[offsetdata['offset'] != -1]
28
		future = pow(10,20)
22
  if (offsetdata is None) or 'addr' not in offsetdata.columns:
23
    return logdata
24
  if (len(set(offsetdata['addr']))) > 1:
25
    offsetdata = offsetdata[offsetdata['addr'] == server_addr]
26
  if len(logdata)>0:
27
    offsetdata = offsetdata.set_index('unixtime')
28
    offsetdata = offsetdata[offsetdata['offset'] != -1]
29
    future = pow(10,20)
29 30

  
30
		if (len(offsetdata) > 0):
31
			x = [0]+list(offsetdata.index)+[future]
32
			y = [i*1000000 for i in ([0]+list(offsetdata['offset'])+[0])] # list of offset to apply to each time in x
33
			y = [sum(i) for i in zip(x,y)] # correct value of time for each given time in x
34
			func = scipy.interpolate.interp1d(x,y)
31
    if (len(offsetdata) > 0):
32
      x = [0]+list(offsetdata.index)+[future]
33
      y = [i*1000000 for i in ([0]+list(offsetdata['offset'])+[0])] # list of offset to apply to each time in x
34
      y = [sum(i) for i in zip(x,y)] # correct value of time for each given time in x
35
      func = scipy.interpolate.interp1d(x,y)
35 36

  
36
			logdata['logTime'] = logdata['logTime'].apply(func)
37
		else:
38
			warning(" time not rescaled for sliver ")
37
      logdata['logTime'] = logdata['logTime'].apply(func)
38
    else:
39
      warning(" time not rescaled for sliver ")
39 40

  
40
	return logdata
41
  return logdata
41 42

  
42 43

  
43 44
def dataPopulate(folder,exp):
44
	# create the sessions with the sources
45
	for logfile in os.listdir(folder):
46
			if logfile.endswith(".log.csv") and is_source_from_filename(logfile):
47
				print "Loading source file: "+logfile
48
				session = exp.getSession(time_from_filename(logfile),creation=True)
49
				peer_data = read_csv(folder+"/"+logfile)
50
				try:
51
					offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
52
				except IOError:
53
					offset_data = None
54

  
55
				peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
56
						rescale_log_time(peer_data,offset_data,hostname_from_filename(logfile)))
57
				session.addPeer(peer)
45
  # create the sessions with the sources
46
  for logfile in os.listdir(folder):
47
      if logfile.endswith(".chunklog.csv") and is_source_from_filename(logfile):
48
        print "Loading source file: "+logfile
49
        session = exp.getSession(time_from_filename(logfile),creation=True)
50
        peer_data = read_csv(folder+"/"+logfile)
51
        try:
52
          offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
53
        except IOError:
54
          offset_data = None
55

  
56
        peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
57
            rescale_log_time(peer_data,offset_data,hostname_from_filename(logfile)))
58
        session.addPeer(peer)
58 59

  
59 60
# add the peers
60
	for logfile in os.listdir(folder):
61
		if logfile.endswith(".log.csv") and not is_source_from_filename(logfile):
62
			print "Loading generic peer file: "+logfile
63
			session = exp.getSession(time_from_filename(logfile))
64
                        if session is not None:
65
                            source = session.getSource()
66
                            peer_data = read_csv(folder+"/"+logfile)
67
                            try:
68
                                    offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
69
                            except IOError:
70
                                    offset_data = None
71

  
72
                            peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
73
                                            rescale_log_time(peer_data,offset_data,source))
74
                            session.addPeer(peer)
61
  for logfile in os.listdir(folder):
62
    if logfile.endswith(".chunklog.csv") and not is_source_from_filename(logfile):
63
      print "Loading generic peer file: "+logfile
64
      session = exp.getSession(time_from_filename(logfile))
65
      if session is not None:
66
          source = session.getSource()
67
          peer_data = read_csv(folder+"/"+logfile)
68
          try:
69
                  offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
70
          except IOError:
71
                  offset_data = None
72

  
73
          peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
74
                          rescale_log_time(peer_data,offset_data,source))
75
          session.addPeer(peer)
76
# add neighbourhood info
77
  for logfile in os.listdir(folder):
78
    if logfile.endswith(".neighlog.csv"):
79
      print "Loading peer file: "+logfile
80
      hostname = hostname_from_filename(logfile)
81
      peer = exp.getPeer(hostname)
82
      if peer:
83
        peer.setNeighbours(read_csv(folder+"/"+logfile))
84
      else:
85
        print "WARNING: peer " + hostname + " not found"
75 86

  
76 87
# prune deviating sessions
77
	for session in exp.sessions:
78
		for peer in session.peers:
79
                    rtt_file = rtt_filename(folder,peer)
80
                    if rtt_file is not None:
81
			ping_data = read_csv(rtt_file)
82
			ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + START_ANALYSIS_TIME)) & \
83
					(ping_data['unixtime'] <= (session.time + STOP_ANALYSIS_TIME))]
84
			ping_sent = ping_data['sent'].sum()
85
			echo_received = ping_data['answers'].sum()
86
                        if ping_sent==0:
87
                            warning("seems that peer "+peer.hostname+" has no ICMP data")
88
			if ping_sent!=0 and ((echo_received/ping_sent) < 0.7):
89
				exp.removeSessionByTime(session.time)
90
                                print ping_sent
91
                                print echo_received
92
				warning( "REMOVED SESSION "+ str(session.time))
93
                                warning (" because of "+rtt_file)
94
                    else:
95
                        warning("peer "+peer.hostname+" has not ICMP file")
88
  for session in exp.sessions:
89
    for peer in session.peers:
90
      rtt_file = rtt_filename(folder,peer)
91
      if rtt_file is not None:
92
        ping_data = read_csv(rtt_file)
93
        ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + START_ANALYSIS_TIME)) & \
94
            (ping_data['unixtime'] <= (session.time + STOP_ANALYSIS_TIME))]
95
        ping_sent = ping_data['sent'].sum()
96
        echo_received = ping_data['answers'].sum()
97
        if ping_sent==0:
98
          warning("seems that peer "+peer.hostname+" has no ICMP data")
99
        if ping_sent!=0 and ((echo_received/ping_sent) < 0.7):
100
          exp.removeSessionByTime(session.time)
101
          print ping_sent
102
          print echo_received
103
          warning( "REMOVED SESSION "+ str(session.time))
104
          warning (" because of "+rtt_file)
105
        else:
106
          warning("peer "+peer.hostname+" has not ICMP file")
96 107

  
97 108
def ICMPLossAnalyse(folder,exp,begin_sec,end_sec):
98 109
    out_file = open(folder+"/ICMPloss.exp","w")
......
110 121
    out_file.close()
111 122

  
112 123
def rtt_filename(folder,peer):
113
	for logfile in os.listdir(folder):
114
		if logfile.startswith(peer.hostname+"_rtt_") and logfile.endswith(".csv"):
115
			return folder+'/'+logfile
124
  for logfile in os.listdir(folder):
125
    if logfile.startswith(peer.hostname+"_rtt_") and logfile.endswith(".csv"):
126
      return folder+'/'+logfile
116 127

  
117 128
def purify(delim,line):
118
	if(line.count(delim)>1):
119
		warning( str(line.count(delim)-1) +" incomplete records.")
120
	return delim+line.split(delim)[-1]
129
  if(line.count(delim)>1):
130
    warning( str(line.count(delim)-1) +" incomplete records.")
131
  return delim+line.split(delim)[-1]
121 132

  
122 133
def preproc_csv_file(csvfile):
123
	tmpfile="/tmp/tmpcsv"+str(randrange(99999999999999999))
124
	csvfile = os.path.abspath(csvfile)
125
	infile = open(csvfile,"r")
126
	outfile = open(tmpfile,"w")
127
	field_n = 0
128
	modified = False
129
	for in_line in infile.xreadlines():
130
		if len(in_line.split(',')) != field_n:
131
			if field_n == 0:
132
				field_n = len(in_line.split(','))
133
				outfile.write(in_line)
134
			else:
135
#				print "[WARNING] Non conformant line:\n\t"+in_line
136
				modified = True
137
		else:
138
			outfile.write(in_line)
139
	infile.close()
140
	outfile.close()
141
	if modified:
142
		sh.move(tmpfile,csvfile)
143
#		os.renames(tmpfile,csvfile)
134
  tmpfile="/tmp/tmpcsv"+str(randrange(99999999999999999))
135
  csvfile = os.path.abspath(csvfile)
136
  infile = open(csvfile,"r")
137
  outfile = open(tmpfile,"w")
138
  field_n = 0
139
  modified = False
140
  for in_line in infile.xreadlines():
141
    if len(in_line.split(',')) != field_n:
142
      if field_n == 0:
143
        field_n = len(in_line.split(','))
144
        outfile.write(in_line)
145
      else:
146
#       print "[WARNING] Non conformant line:\n\t"+in_line
147
        modified = True
148
    else:
149
      outfile.write(in_line)
150
  infile.close()
151
  outfile.close()
152
  if modified:
153
    sh.move(tmpfile,csvfile)
154
  os.remove(tmpfile)
155
#   os.renames(tmpfile,csvfile)
144 156

  
145 157
def preproc_csv(folder,procman=None):
146
	for csvfile in os.listdir(folder):
147
		if csvfile.endswith(".csv"):
148
			print "Preprocessing CSV file: "+csvfile
149
			if procman:
150
				procman.launchProcess(preproc_csv_file,[folder+"/"+csvfile])
151
			else:
152
				preproc_csv_file(folder+"/"+csvfile)
158
  for csvfile in os.listdir(folder):
159
    if csvfile.endswith(".csv"):
160
      print "Preprocessing CSV file: "+csvfile
161
      if procman:
162
        procman.launchProcess(preproc_csv_file,[folder+"/"+csvfile])
163
      else:
164
        preproc_csv_file(folder+"/"+csvfile)
153 165

  
154 166
def preproc_chunklog(folder):
155
	for logfile in os.listdir(folder):
156
		if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
157
			print "Preprocessing file: "+logfile
158
			infile = open(folder+"/"+logfile,"r")
159
			csvfile = open(folder+"/"+logfile+".csv","w")
160
			csvfile.write("CHUNK_LOG,logTime,sender"+\
161
					",receiver,chunkID,size[bytes],chunkTimestamp,hopcount,action\n")
162
			for in_line in infile.xreadlines():
163
				if "CHUNK_LOG" in in_line[:10]:
164
					csvfile.write(purify("[CHUNK_LOG]",in_line))
165
			infile.close()
166
			csvfile.close()
167
  for logfile in os.listdir(folder):
168
    if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
169
      print "Preprocessing file: "+logfile
170
      infile = open(folder+"/"+logfile,"r")
171
      csvfile = open(folder+"/"+logfile+".chunklog.csv","w")
172
      csvfile.write("CHUNK_LOG,logTime,sender"+\
173
          ",receiver,chunkID,size[bytes],chunkTimestamp,hopcount,action\n")
174
      for in_line in infile.xreadlines():
175
        if "CHUNK_LOG" in in_line[:10]:
176
          csvfile.write(purify("[CHUNK_LOG]",in_line))
177
      infile.close()
178
      csvfile.close()
179

  
180
def preproc_neighlog(folder):
181
  for logfile in os.listdir(folder):
182
    if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
183
      print "Preprocessing file: "+logfile
184
      infile = open(folder+"/"+logfile,"r")
185
      csvfile = open(folder+"/"+logfile+".neighlog.csv","w")
186
      csvfile.write("NEIGH_LOG,logTime,logger"+\
187
          ",peer,neigh_size\n")
188
      for in_line in infile.xreadlines():
189
        if "NEIGHBOURHOOD" in in_line[:15]:
190
          csvfile.write(purify("[NEIGHBOURHOOD]",in_line))
191
      infile.close()
192
      csvfile.close()
167 193

  
168 194
def delaySessionAnalyse(folder,session,begin_sec,end_sec):
169
	if (session.getSource() != None):
170
		out_file = open(folder+"/"+str(session.time)+"_session_delay.exp","w")
171
		out_file.write("info_type,session_id,peer_hostname,delay,hops\n")
172

  
173
		for peer in session.peers:
174
			logs = peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
175
			received = logs[logs['action'] == "RECEIVED"]
176

  
177
			for i in received.index:
178
				row = received.ix[i]
179
				out_file.write("DELAY,"+str(session.time)+","\
180
						+purify_hostname(peer.hostname)+","\
181
						+str(row['logTime'] - row['chunkTimestamp'])+","\
182
							+str(row['hopcount'])+"\n")
183

  
184
#			for delay in (received['logTime']-received['chunkTimestamp']):
185
#				out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(delay)+"\n")
186
				
187
		out_file.close()
195
  if (session.getSource() != None):
196
    out_file = open(folder+"/"+str(session.time)+"_session_delay.exp","w")
197
    out_file.write("info_type,session_id,peer_hostname,delay,hops\n")
198

  
199
    for peer in session.peers:
200
      logs = peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
201
      received = logs[logs['action'] == "RECEIVED"]
202

  
203
      for i in received.index:
204
        row = received.ix[i]
205
        out_file.write("DELAY,"+str(session.time)+","\
206
            +purify_hostname(peer.hostname)+","\
207
            +str(row['logTime'] - row['chunkTimestamp'])+","\
208
              +str(row['hopcount'])+"\n")
209

  
210
#     for delay in (received['logTime']-received['chunkTimestamp']):
211
#       out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(delay)+"\n")
212
        
213
    out_file.close()
188 214

  
189 215

  
190 216
def delayAnalyse(folder,exp,begin_sec,end_sec):
191
	out_file = open(folder+"/packets_delay.exp","w")
192
	out_file.write("info_type,session_id,peer_hostname,avg_delay\n")
193
	for session in exp.sessions:
194
		delaySessionAnalyse(folder,session,begin_sec,end_sec)
195
		for peer in session.peers:
196
			if not peer.is_source:
197
				records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
198
				received=records[records['action'] == 'RECEIVED']
199
				minimum=(received['logTime']-received['chunkTimestamp']).min()
200
				if (minimum < -5000): 
201
					warning(" Timestamps seems to be not consistent!") # -5msec
202
					print "          Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds"
203
				mean=(received['logTime']-received['chunkTimestamp']).mean()
204
				out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n")
205

  
206
	out_file.close()
217
  out_file = open(folder+"/packets_delay.exp","w")
218
  out_file.write("info_type,session_id,peer_hostname,avg_delay\n")
219
  for session in exp.sessions:
220
    delaySessionAnalyse(folder,session,begin_sec,end_sec)
221
    for peer in session.peers:
222
      if not peer.is_source:
223
        records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
224
        received=records[records['action'] == 'RECEIVED']
225
        minimum=(received['logTime']-received['chunkTimestamp']).min()
226
        if (minimum < -5000): 
227
          warning(" Timestamps seems to be not consistent!") # -5msec
228
          print "          Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds"
229
        mean=(received['logTime']-received['chunkTimestamp']).mean()
230
        out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n")
231

  
232
  out_file.close()
207 233

  
208 234
def sessionLossAnalyse(folder,session,begin_sec,end_sec):
209
	delta = int((end_sec-begin_sec)/100) # time resolution in seconds
210
        if delta == 0:
211
            delta = 1
212
	if (session.getSource() != None):
213
		out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w")
214
		out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n")
215
		source_chunks = session.getSource().published_interval(\
216
				(session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
217

  
218
		for peer in session.peers:
219
			old_chunks = set()
220
			peer_chunks_ids = set(peer.published['chunkID'])
221
			for instant in range(session.time+begin_sec,session.time+end_sec,delta):
222
				delta_chunks = set(source_chunks[source_chunks['logTime'] <= (instant*1000000)]['chunkID']).difference(old_chunks)
223
				n_source_chunks = len(delta_chunks)
224
				n_lost_chunks = len(delta_chunks.difference(peer_chunks_ids))
225
				out_file.write("LOSS,"+str(session.time)+","+str(instant)+","+\
226
						str(n_source_chunks)+","+purify_hostname(peer.hostname)+","+str(n_lost_chunks)+"\n")
227
				old_chunks = old_chunks.union(delta_chunks)
228
				
229
		out_file.close()
235
  delta = int((end_sec-begin_sec)/100) # time resolution in seconds
236
  if delta == 0:
237
    delta = 1
238
  if (session.getSource() != None):
239
    out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w")
240
    out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n")
241
    source_chunks = session.getSource().published_interval(\
242
        (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
243

  
244
    for peer in session.peers:
245
      old_chunks = set()
246
      peer_chunks_ids = set(peer.published['chunkID'])
247
      for instant in range(session.time+begin_sec,session.time+end_sec,delta):
248
        delta_chunks = set(source_chunks[source_chunks['logTime'] <= (instant*1000000)]['chunkID']).difference(old_chunks)
249
        n_source_chunks = len(delta_chunks)
250
        n_lost_chunks = len(delta_chunks.difference(peer_chunks_ids))
251
        out_file.write("LOSS,"+str(session.time)+","+str(instant)+","+\
252
            str(n_source_chunks)+","+purify_hostname(peer.hostname)+","+str(n_lost_chunks)+"\n")
253
        old_chunks = old_chunks.union(delta_chunks)
254
        
255
    out_file.close()
230 256

  
231 257
def hostnames2key(names):
232
	return '-'.join(names)
258
  return '-'.join(names)
233 259

  
234 260
def key2hostnames(key):
235
	return key.split('-')
261
  return key.split('-')
236 262

  
237 263
def receptionAnalyse(folder,exp,begin_sec,end_sec):
238
	out_file = open(folder+"/edges.exp","w")
239
	out_file.write("info_type,session_id,peer_sender,peer_receiver,weight\n")
240

  
241
	for session in exp.sessions:
242
		edges = {}
243
		for peer in session.peers:
244
			records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
245
			records=records[records['action'] == "RECEIVED"]
246
			sender=records['sender']
247
			receiver=records['receiver']
248
			for rec in sender.iteritems(): #range(0,sender.size):
249
				i = rec[0]
250
				if hostnames2key([sender[i],receiver[i]]) in edges.keys():
251
					edges[hostnames2key([sender[i],receiver[i]])] += 1
252
				else:
253
					edges[hostnames2key([sender[i],receiver[i]])] = 1
254
		for edge in edges.keys():
255
#			print edge
256
			host1 = exp.addr2hostname(key2hostnames(edge)[0])
257
			host2 = exp.addr2hostname(key2hostnames(edge)[1])
258
#			print str(host1) + ' ' + str(host2)
259
			out_file.write("EDGE,"+str(session.time)+","+ \
260
					purify_hostname(host1)+","+ \
261
					purify_hostname(host2)+","+ \
262
					str(edges[edge])+"\n")
263

  
264
	out_file.close()
264
  out_file = open(folder+"/edges.exp","w")
265
  out_file.write("info_type,session_id,peer_sender,peer_receiver,weight\n")
266

  
267
  for session in exp.sessions:
268
    edges = {}
269
    for peer in session.peers:
270
      records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
271
      records=records[records['action'] == "RECEIVED"]
272
      sender=records['sender']
273
      receiver=records['receiver']
274
      for rec in sender.iteritems(): #range(0,sender.size):
275
        i = rec[0]
276
        if hostnames2key([sender[i],receiver[i]]) in edges.keys():
277
          edges[hostnames2key([sender[i],receiver[i]])] += 1
278
        else:
279
          edges[hostnames2key([sender[i],receiver[i]])] = 1
280
    for edge in edges.keys():
281
#     print edge
282
      host1 = exp.addr2hostname(key2hostnames(edge)[0])
283
      host2 = exp.addr2hostname(key2hostnames(edge)[1])
284
#     print str(host1) + ' ' + str(host2)
285
      out_file.write("EDGE,"+str(session.time)+","+ \
286
          purify_hostname(host1)+","+ \
287
          purify_hostname(host2)+","+ \
288
          str(edges[edge])+"\n")
289

  
290
  out_file.close()
265 291

  
266 292
def rttAnalyse(folder,exp):
267
	out_file = open(folder + "/slivers_rtts.exp","w")
268
	out_file.write("RTT_TYPE,SRC,DST,MSEC,MDEV\n")
269

  
270
	for logfile in os.listdir(folder):
271
		if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: 
272
			print "RTT analysing: "+logfile
273
			data = read_csv(folder + "/" + logfile)
274
			data = data[data['rtt'] != -1].reindex(range(0,len(data)))
275
			if len(data)>0:
276
				sliver = purify_hostname(hostname_from_filename(logfile))#data['hostname'][0])
277
				avgs = data.groupby("addr").mean()
278
				maxs = data.groupby("addr").max()
279

  
280
				for i in range(0,len(avgs)):
281
					dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
282
					rtt = avgs.ix[i]['rtt']
283
					mdev = avgs.ix[i]['mdev']
284
					out_file.write("RTT_AVG,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
285

  
286
				for i in range(0,len(maxs)):
287
					dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
288
					rtt = maxs.ix[i]['rtt']
289
					mdev = maxs.ix[i]['mdev']
290
					out_file.write("RTT_MAX,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
291

  
292
	out_file.close()
293
  out_file = open(folder + "/slivers_rtts.exp","w")
294
  out_file.write("RTT_TYPE,SRC,DST,MSEC,MDEV\n")
295

  
296
  for logfile in os.listdir(folder):
297
    if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: 
298
      print "RTT analysing: "+logfile
299
      data = read_csv(folder + "/" + logfile)
300
      data = data[data['rtt'] != -1].reindex(range(0,len(data)))
301
      if len(data)>0:
302
        sliver = purify_hostname(hostname_from_filename(logfile))#data['hostname'][0])
303
        avgs = data.groupby("addr").mean()
304
        maxs = data.groupby("addr").max()
305

  
306
        for i in range(0,len(avgs)):
307
          dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
308
          rtt = avgs.ix[i]['rtt']
309
          mdev = avgs.ix[i]['mdev']
310
          out_file.write("RTT_AVG,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
311

  
312
        for i in range(0,len(maxs)):
313
          dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
314
          rtt = maxs.ix[i]['rtt']
315
          mdev = maxs.ix[i]['mdev']
316
          out_file.write("RTT_MAX,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
317

  
318
  out_file.close()
293 319

  
294 320

  
295 321
def hopsAnalyse(folder,exp,begin_sec,end_sec):
296
	out_file = open(folder+"/packets_hops.exp","w")
297
	out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n")
298
	delta = int((end_sec-begin_sec)/100) # time resolution in seconds
299
        if delta == 0:
300
            delta = 1
301
	for session in exp.sessions:
302
		if (session.getSource() != None):
303
			for peer in session.peers:
304
				for instant in range(session.time+begin_sec,session.time+end_sec,delta):
305
					delta_hops = peer.published_interval_sec(instant,instant+delta)['hopcount']
306
					out_file.write("HOP,"+str(session.time)+","+str(instant+delta)+","+str(delta_hops.count())+","+\
307
						peer.hostname+","+str(delta_hops.mean())+"\n")
308

  
309
		else:
310
			warning ("No source for session "+str(session.time))
311

  
312
	out_file.close()
322
  out_file = open(folder+"/packets_hops.exp","w")
323
  out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n")
324
  delta = int((end_sec-begin_sec)/100) # time resolution in seconds
325
  if delta == 0:
326
    delta = 1
327
  for session in exp.sessions:
328
    if (session.getSource() != None):
329
      for peer in session.peers:
330
        for instant in range(session.time+begin_sec,session.time+end_sec,delta):
331
          delta_hops = peer.published_interval_sec(instant,instant+delta)['hopcount']
332
          out_file.write("HOP,"+str(session.time)+","+str(instant+delta)+","+str(delta_hops.count())+","+\
333
            peer.hostname+","+str(delta_hops.mean())+"\n")
334

  
335
    else:
336
      warning ("No source for session "+str(session.time))
337

  
338
  out_file.close()
339

  
340
def seekPathFile(folder,session_id):
341
  paths = {}
342
  for logfile in os.listdir(folder):
343
      if logfile.endswith(".paths"):
344
        time = logfile.split('_')[-1]
345
        time = int(time.split('.')[0])
346
        if time <= session_id:
347
          paths[time] = logfile
348
  if len(paths.keys()) > 0:
349
    return folder+'/'+paths[min(paths.keys())]
350
  else:
351
    return None
352

  
353
def triel(n,x,y):                                                                   
354
  i = min(x,y)                                                                
355
  j = max(x,y)                                                                
356
  k= int(j-1+float((2*n-3)*i-i**2)/2)                                         
357
  return k  
358

  
359
def spath_id(nodes_num,path): 
360
  a = {}    
361
  for i in range(0,len(path)-1):                                                      
362
    a[triel(nodes_num,path[i],path[i+1])] = 1 # redundant, we only need initialization                                                                      
363
  return a    
364

  
365
def loadShortestPaths(paths_file):
366
  names = {}
367
  edges = {}
368

  
369
  for line in  open(paths_file,'r'):
370
    line = line.strip()
371
    for ipa in line.split(','):
372
      if ipa not in names.keys():
373
        names[ipa] = len(names.keys())
374

  
375
  n = len(names.keys())
376
  for line in  open(paths_file,'r'):
377
    line = line.strip()
378
    elems = [names[x] for x in line.split(',')]
379
    src = elems[0]
380
    dst = elems[-1]
381
    edges[triel(n,src,dst)] = spath_id(n,elems)
382

  
383
  return edges,names
384

  
385
def logs2edges(names,logs):
386
  edges = []
387
  num = len(names.keys())
388
  for i in logs.index:
389
    row = logs.ix[i]
390
    src = names[row['logger'].split(':')[0]]
391
    dst = names[row['peer'].split(':')[0]]
392
    edges.append(triel(num,src,dst))
393
  return edges
394

  
395
def add_spath_id(a1,a2):                                                        
396
      return dict(Counter(a1)+Counter(a2))
397

  
398
def neighAnalyse(folder,exp,begin_sec,end_sec):
399
  time_sensibility = 10
400
  out_file = open(folder+"/network_impact.exp","w")
401
  out_file.write("info_type,session_id,time,netimpact\n")
402

  
403
  for session in exp.sessions:
404
    paths_file = seekPathFile(folder,session.time)
405
    if paths_file:
406
      paths,names = loadShortestPaths(paths_file)
407
      for instant in range(begin_sec,end_sec,time_sensibility): 
408
        edges = []
409
        for peer in session.peers:
410
          logs = peer.neighbours_interval_sec(session.time + instant,session.time + instant+time_sensibility)
411
          if len(logs) > 0:
412
            maxtime = max(logs['logTime'])
413
            logs = logs[logs['logTime'] == maxtime]
414
            edges = edges +  logs2edges(names,logs)
415
          else:
416
            print "WARNING: no neighbourhood data for interval " + str(session.time+begin_sec) + "-" + str(session.time+end_sec)
417
        
418
        sum_edges = {}
419
        for e in edges:
420
          sum_edges = add_spath_id(sum_edges,paths[e])
421

  
422
        out_file.write("NEIGHIMPACT" +','+ str(session.time)+','+str(instant)+','+str(nm.linalg.norm(sum_edges.values(),ord=2))+'\n')
423
    else:
424
      print "WARNING: shortest paths file not found for session "+str(session.time)
425
  out_file.close()
313 426

  
314 427
def lossAnalyse(folder,exp,begin_sec,end_sec):
315
	out_file = open(folder+"/packets_loss.exp","w")
316
	out_file.write("info_type,session_id,chunks,peer_hostname,losts\n")
317

  
318
	for session in exp.sessions:
319
		sessionLossAnalyse(folder,session,begin_sec,end_sec)
320
		print "Session: "+str(session.time)
321
		if (session.getSource() != None):
322
			source_ids = set(session.getSource().published_interval(\
323
					(session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)\
324
					['chunkID'])
325

  
326
			for peer in session.peers:
327
				if not peer.is_source:
328
					peer_ids = set(peer.published['chunkID'])#_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)['chunkID'])
329
					print "Peer "+peer.hostname+" lost "+\
330
							str(len(source_ids.difference(peer_ids)))+\
331
							"/"+str(len(source_ids))+" packets" 
332
					out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\
333
							","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n")
334
		else:
335
			warning("source not found for session " + str(session.time))
336

  
337
	out_file.close()
338

  
339

  
340
def	dumpHostnameMap(folder,exp):
341
	hostnames = {}
342
	for session in exp.sessions:
343
		for peer in session.peers:
344
			hostnames[peer.hostname] = peer.address
345

  
346
	out_file = open(folder+"/hostnames2address.exp","w")
347
	out_file.write("info_type,hostname,address\n")
348
	for key in hostnames.keys():
349
		out_file.write("MAPPING,"+key+","+hostnames[key]+"\n")
350

  
351
	out_file.close()
352
	
428
  out_file = open(folder+"/packets_loss.exp","w")
429
  out_file.write("info_type,session_id,chunks,peer_hostname,losts\n")
430

  
431
  for session in exp.sessions:
432
    sessionLossAnalyse(folder,session,begin_sec,end_sec)
433
    print "Session: "+str(session.time)
434
    if (session.getSource() != None):
435
      source_ids = set(session.getSource().published_interval(\
436
          (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)\
437
          ['chunkID'])
438

  
439
      for peer in session.peers:
440
        if not peer.is_source:
441
          peer_ids = set(peer.published['chunkID'])#_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)['chunkID'])
442
          print "Peer "+peer.hostname+" lost "+\
443
              str(len(source_ids.difference(peer_ids)))+\
444
              "/"+str(len(source_ids))+" packets" 
445
          out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\
446
              ","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n")
447
    else:
448
      warning("source not found for session " + str(session.time))
449

  
450
  out_file.close()
451

  
452

  
453
def dumpHostnameMap(folder,exp):
454
  hostnames = {}
455
  for session in exp.sessions:
456
    for peer in session.peers:
457
      hostnames[peer.hostname] = peer.address
458

  
459
  out_file = open(folder+"/hostnames2address.exp","w")
460
  out_file.write("info_type,hostname,address\n")
461
  for key in hostnames.keys():
462
    out_file.write("MAPPING,"+key+","+hostnames[key]+"\n")
463

  
464
  out_file.close()
465
  
353 466

  
354 467
def analyze_main(argv):
355
	try:
356
		opts,args = getopt.getopt(argv,"hf:",["help","folder"])
357
	except getopt.GetoptError:
358
		sys.exit(2)
359
	for opt,arg in opts:
360
		if opt in ("-h","--help"):
361
			sys.exit()
362
		elif opt in ("-f","--folder"):
363
			folder = arg
364
	
365
	try:
366
		folder
367
	except NameError:
368
		sys.exit()
369
	print "folder is " + folder
370
	exp = Experiment()
371
	pm = pmgr.ProcessManager()
372
	preproc_chunklog(folder)
373
	preproc_csv(folder,procman=pm)
374
	pm.joinAll()
375

  
376
	dataPopulate(folder,exp)
377
	pm.launchProcess(rttAnalyse,[folder,exp])
378
	pm.launchProcess(lossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
379
	pm.launchProcess(ICMPLossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
380
	pm.launchProcess(delayAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
381
	pm.launchProcess(receptionAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
382
	pm.launchProcess(hopsAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
383
	pm.joinAll()
384
	dumpHostnameMap(folder,exp)
385

  
386
	for session in exp.sessions:
387
		print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers"
468
  try:
469
    opts,args = getopt.getopt(argv,"hf:",["help","folder"])
470
  except getopt.GetoptError:
471
    sys.exit(2)
472
  for opt,arg in opts:
473
    if opt in ("-h","--help"):
474
      sys.exit()
475
    elif opt in ("-f","--folder"):
476
      folder = arg
477
  
478
  try:
479
    folder
480
  except NameError:
481
    sys.exit()
482
  print "folder is " + folder
483
  exp = Experiment()
484
  pm = pmgr.ProcessManager()
485
  preproc_chunklog(folder)
486
  preproc_neighlog(folder)
487
  preproc_csv(folder,procman=pm)
488
  pm.joinAll()
489

  
490
  dataPopulate(folder,exp)
491
  #pm.launchProcess(rttAnalyse,[folder,exp])
492
  pm.launchProcess(lossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
493
  pm.launchProcess(neighAnalyse,[folder,exp,0,600])
494
  #pm.launchProcess(ICMPLossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
495
  #pm.launchProcess(delayAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
496
  #pm.launchProcess(receptionAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
497
  #pm.launchProcess(hopsAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
498
  pm.joinAll()
499
  dumpHostnameMap(folder,exp)
500

  
501
  for session in exp.sessions:
502
    print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers"
388 503

  
389 504
if __name__ == "__main__":
390
	analyze_main(sys.argv[1:])	
505
  analyze_main(sys.argv[1:])  
py_logs_correlated_visualizer.py
245 245
	simple_bar_graph(groups,accuracy,ylabel='receiving ratio')#,title='Chunks received ratio')
246 246
	save_figure(folder,'overall_loss')
247 247

  
248
def corr_neigh_impact_evolution(folder,groups):
249
  fig = plt.figure()
250
  for g in groups:
251
    filename = folder+'/'+g+'/network_impact.exp'
252
    if os.path.exists(filename):
253
      data = read_csv(filename).drop(['info_type','session_id'],1)
254
      data = data.groupby('time').median() # WARNING : with multiple sessions this value does no longer make sense
255
      plt.plot(data.index,data['netimpact'],label=g)
256
  
257
  plt.legend(loc='best')
258
  set_plot_labels(fig,'neighbourhood_evolution','time (s)','network impact')
259
  save_figure(folder,'neighbourhood_evolution')
260

  
261
def corr_neigh_impact_over_tag(folder,groups):
262
  fig = plt.figure()
263
  plot_data = {}
264
  for g in groups:
265
    if len(g.split('-')) > 1: 
266
      groupkey = g.split('-')[0]
267
      tag = g.split('-')[1]
268
      if groupkey not in plot_data.keys():
269
        plot_data[groupkey] = {}
270

  
271
      filename = folder+'/'+g+'/network_impact.exp'
272
      if os.path.exists(filename):
273
        data = read_csv(filename).drop(['info_type','session_id'],1)
274
        tag = int(''.join([c for c in list(tag) if c.isdigit()]))
275
        plot_data[groupkey][tag] = data['netimpact'].mean()
276
  
277
  if len(plot_data) > 0:
278
    for g in plot_data.keys():
279
      s = Series(plot_data[g])
280
      plt.plot(s.index,s,label=g)
281
    
282
    plt.legend(loc='best')
283
    set_plot_labels(fig,'','tag','network impact')
284
    save_figure(folder,'corr_netimpact')
285

  
248 286
def corr_visual_loss_active_peers(folder,groups):
249 287
	files = []
250 288
	accuracy = []
......
271 309
	mpl.rcParams.update({'font.size': 16})
272 310
	corr_visual_loss(folder,groups)
273 311
	corr_visual_loss_active_peers(folder,groups)
274
	corr_visual_hops(folder,groups)
275
	corr_visual_delay(folder,groups)
276
	delayVisualize2(folder,groups)
312
	corr_neigh_impact_evolution(folder,groups)
313
	corr_neigh_impact_over_tag(folder,groups)
314
	#corr_visual_hops(folder,groups)
315
	#corr_visual_delay(folder,groups)
316
	#delayVisualize2(folder,groups)
277 317

  
278 318
	if not save:
279 319
		plt.show()
py_logs_splitter.py
8 8
import shutil as sh
9 9

  
10 10
SESSION_TOLERANCE=20
11
SESSION_INDEPENDENT_KEYWORDS=['rtt','timespread']
11
SESSION_INDEPENDENT_KEYWORDS=['TestResults','shortest','rtt','timespread','NetLoad','PSTestNET']
12 12

  
13 13
def name_splitter(folder):
14 14
# returns a dictionary of type: [timestamp] -> group_name
......
16 16
	for logfile in os.listdir(folder):
17 17
		if is_source_from_filename(logfile):
18 18
			source_info = logfile.split('.')[0].split('_')
19
			if len(source_info) > 4:
20
				session_groups[time_from_filename(logfile)] = source_info[3]
19
			if len(source_info) > 2:
20
				session_groups[time_from_filename(logfile)] = source_info[2]
21 21
			else:
22 22
				session_groups[time_from_filename(logfile)] = 'standard'
23 23
	return session_groups
......
31 31
				(int(time) > (int(keytime) - SESSION_TOLERANCE)):
32 32
			group = session_groups[keytime]
33 33

  
34
	if (logfile.split('.')[0].split('_')[3]) in session_groups.values():
35
		group = logfile.split('.')[0].split('_')[3]
34
	if (logfile.split('.')[0].split('_')[2]) in session_groups.values():
35
		group = logfile.split('.')[0].split('_')[2]
36 36
	else:
37 37
		group = None
38 38
	return group
......
62 62
		if os.path.isfile(folder+"/"+logfile):
63 63
			logtime = time_from_filename(logfile) 
64 64

  
65

  
66 65
			if session_independent(logfile):
67 66
				multiple_dst_copy(folder,logfile,[folder +'/'+ v for v in set(session_groups.values())])
68 67
			else:

Also available in: Unified diff