Statistics
| Branch: | Revision:

peerstreamer-logs-analyzer / py_logs_analizer.py @ c0eac366

History | View | Annotate | Download (19 KB)

1
#!/usr/bin/python
2
from __future__ import division
3
import getopt,os,sys
4
import numpy as nm
5
from pandas import *
6
from random import randrange
7
import scipy.interpolate
8
import shutil as sh
9
from collections import Counter
10

    
11
sys.path.insert(0,'lib')
12
import process_manager as pmgr
13
from experiment import *
14
from peerstreamer_logs import *
15
from utilities import *
16

    
17

    
18
START_ANALYSIS_TIME = 150
19
STOP_ANALYSIS_TIME = 450
20

    
21
def rescale_log_time(logdata,offsetdata,server_addr):
22
  if (offsetdata is None) or 'addr' not in offsetdata.columns:
23
    return logdata
24
  if (len(set(offsetdata['addr']))) > 1:
25
    offsetdata = offsetdata[offsetdata['addr'] == server_addr]
26
  if len(logdata)>0:
27
    offsetdata = offsetdata.set_index('unixtime')
28
    offsetdata = offsetdata[offsetdata['offset'] != -1]
29
    future = pow(10,20)
30

    
31
    if (len(offsetdata) > 0):
32
      x = [0]+list(offsetdata.index)+[future]
33
      y = [i*1000000 for i in ([0]+list(offsetdata['offset'])+[0])] # list of offset to apply to each time in x
34
      y = [sum(i) for i in zip(x,y)] # correct value of time for each given time in x
35
      func = scipy.interpolate.interp1d(x,y)
36

    
37
      logdata['logTime'] = logdata['logTime'].apply(func)
38
    else:
39
      warning(" time not rescaled for sliver ")
40

    
41
  return logdata
42

    
43

    
44
def dataPopulate(folder,exp):
45
  # create the sessions with the sources
46
  for logfile in os.listdir(folder):
47
      if logfile.endswith(".chunklog.csv") and is_source_from_filename(logfile):
48
        print "Loading source file: "+logfile
49
        session = exp.getSession(time_from_filename(logfile),creation=True)
50
        peer_data = read_csv(folder+"/"+logfile)
51
        try:
52
          offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
53
        except IOError:
54
          offset_data = None
55

    
56
        peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
57
            rescale_log_time(peer_data,offset_data,hostname_from_filename(logfile)))
58
        session.addPeer(peer)
59

    
60
# add the peers
61
  for logfile in os.listdir(folder):
62
    if logfile.endswith(".chunklog.csv") and not is_source_from_filename(logfile):
63
      print "Loading generic peer file: "+logfile
64
      session = exp.getSession(time_from_filename(logfile))
65
      if session is not None:
66
          source = session.getSource()
67
          peer_data = read_csv(folder+"/"+logfile)
68
          try:
69
                  offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
70
          except IOError:
71
                  offset_data = None
72

    
73
          peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
74
                          rescale_log_time(peer_data,offset_data,source))
75
          session.addPeer(peer)
76
# add neighbourhood info
77
  for logfile in os.listdir(folder):
78
    if logfile.endswith(".neighlog.csv"):
79
      print "Loading peer file: "+logfile
80
      hostname = hostname_from_filename(logfile)
81
      peer = exp.getPeer(hostname)
82
      if peer:
83
        peer.setNeighbours(read_csv(folder+"/"+logfile))
84
      else:
85
        print "WARNING: peer " + hostname + " not found"
86

    
87
# prune deviating sessions
88
  for session in exp.sessions:
89
    for peer in session.peers:
90
      rtt_file = rtt_filename(folder,peer)
91
      if rtt_file is not None:
92
        ping_data = read_csv(rtt_file)
93
        ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + START_ANALYSIS_TIME)) & \
94
            (ping_data['unixtime'] <= (session.time + STOP_ANALYSIS_TIME))]
95
        ping_sent = ping_data['sent'].sum()
96
        echo_received = ping_data['answers'].sum()
97
        if ping_sent==0:
98
          warning("seems that peer "+peer.hostname+" has no ICMP data")
99
        if ping_sent!=0 and ((echo_received/ping_sent) < 0.7):
100
          exp.removeSessionByTime(session.time)
101
          print ping_sent
102
          print echo_received
103
          warning( "REMOVED SESSION "+ str(session.time))
104
          warning (" because of "+rtt_file)
105
        else:
106
          warning("peer "+peer.hostname+" has not ICMP file")
107

    
108
def ICMPLossAnalyse(folder,exp,begin_sec,end_sec):
109
    out_file = open(folder+"/ICMPloss.exp","w")
110
    out_file.write("session,peer,echo_request,echo_response\n")
111
    for session in exp.sessions:
112
        for peer in session.peers:
113
            rtt_file = rtt_filename(folder,peer)
114
            if rtt_file is not None:
115
                ping_data = read_csv(rtt_file)
116
                ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + begin_sec)) & \
117
                                (ping_data['unixtime'] <= (session.time + end_sec))]
118
                ping_sent = ping_data['sent'].sum()
119
                echo_received = ping_data['answers'].sum()
120
                out_file.write(str(session.time)+","+str(purify_hostname(peer.hostname))+","+str(ping_sent)+","+str(echo_received)+"\n")
121
    out_file.close()
122

    
123
def rtt_filename(folder,peer):
124
  for logfile in os.listdir(folder):
125
    if logfile.startswith(peer.hostname+"_rtt_") and logfile.endswith(".csv"):
126
      return folder+'/'+logfile
127

    
128
def purify(delim,line):
129
  if(line.count(delim)>1):
130
    warning( str(line.count(delim)-1) +" incomplete records.")
131
  return delim+line.split(delim)[-1]
132

    
133
def preproc_csv_file(csvfile):
134
  tmpfile="/tmp/tmpcsv"+str(randrange(99999999999999999))
135
  csvfile = os.path.abspath(csvfile)
136
  infile = open(csvfile,"r")
137
  outfile = open(tmpfile,"w")
138
  field_n = 0
139
  modified = False
140
  for in_line in infile.xreadlines():
141
    if len(in_line.split(',')) != field_n:
142
      if field_n == 0:
143
        field_n = len(in_line.split(','))
144
        outfile.write(in_line)
145
      else:
146
#       print "[WARNING] Non conformant line:\n\t"+in_line
147
        modified = True
148
    else:
149
      outfile.write(in_line)
150
  infile.close()
151
  outfile.close()
152
  if modified:
153
    sh.move(tmpfile,csvfile)
154
  os.remove(tmpfile)
155
#   os.renames(tmpfile,csvfile)
156

    
157
def preproc_csv(folder,procman=None):
158
  for csvfile in os.listdir(folder):
159
    if csvfile.endswith(".csv"):
160
      print "Preprocessing CSV file: "+csvfile
161
      if procman:
162
        procman.launchProcess(preproc_csv_file,[folder+"/"+csvfile])
163
      else:
164
        preproc_csv_file(folder+"/"+csvfile)
165

    
166
def preproc_chunklog(folder):
167
  for logfile in os.listdir(folder):
168
    if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
169
      print "Preprocessing file: "+logfile
170
      infile = open(folder+"/"+logfile,"r")
171
      csvfile = open(folder+"/"+logfile+".chunklog.csv","w")
172
      csvfile.write("CHUNK_LOG,logTime,sender"+\
173
          ",receiver,chunkID,size[bytes],chunkTimestamp,hopcount,action\n")
174
      for in_line in infile.xreadlines():
175
        if "CHUNK_LOG" in in_line[:10]:
176
          csvfile.write(purify("[CHUNK_LOG]",in_line))
177
      infile.close()
178
      csvfile.close()
179

    
180
def preproc_neighlog(folder):
181
  for logfile in os.listdir(folder):
182
    if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
183
      print "Preprocessing file: "+logfile
184
      infile = open(folder+"/"+logfile,"r")
185
      csvfile = open(folder+"/"+logfile+".neighlog.csv","w")
186
      csvfile.write("NEIGH_LOG,logTime,logger"+\
187
          ",peer,neigh_size\n")
188
      for in_line in infile.xreadlines():
189
        if "NEIGHBOURHOOD" in in_line[:15]:
190
          csvfile.write(purify("[NEIGHBOURHOOD]",in_line))
191
      infile.close()
192
      csvfile.close()
193

    
194
def delaySessionAnalyse(folder,session,begin_sec,end_sec):
195
  if (session.getSource() != None):
196
    out_file = open(folder+"/"+str(session.time)+"_session_delay.exp","w")
197
    out_file.write("info_type,session_id,peer_hostname,delay,hops\n")
198

    
199
    for peer in session.peers:
200
      logs = peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
201
      received = logs[logs['action'] == "RECEIVED"]
202

    
203
      for i in received.index:
204
        row = received.ix[i]
205
        out_file.write("DELAY,"+str(session.time)+","\
206
            +purify_hostname(peer.hostname)+","\
207
            +str(row['logTime'] - row['chunkTimestamp'])+","\
208
              +str(row['hopcount'])+"\n")
209

    
210
#     for delay in (received['logTime']-received['chunkTimestamp']):
211
#       out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(delay)+"\n")
212
        
213
    out_file.close()
214

    
215

    
216
def delayAnalyse(folder,exp,begin_sec,end_sec):
217
  out_file = open(folder+"/packets_delay.exp","w")
218
  out_file.write("info_type,session_id,peer_hostname,avg_delay\n")
219
  for session in exp.sessions:
220
    delaySessionAnalyse(folder,session,begin_sec,end_sec)
221
    for peer in session.peers:
222
      if not peer.is_source:
223
        records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
224
        received=records[records['action'] == 'RECEIVED']
225
        minimum=(received['logTime']-received['chunkTimestamp']).min()
226
        if (minimum < -5000): 
227
          warning(" Timestamps seems to be not consistent!") # -5msec
228
          print "          Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds"
229
        mean=(received['logTime']-received['chunkTimestamp']).mean()
230
        out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n")
231

    
232
  out_file.close()
233

    
234
def sessionLossAnalyse(folder,session,begin_sec,end_sec):
235
  delta = int((end_sec-begin_sec)/100) # time resolution in seconds
236
  if delta == 0:
237
    delta = 1
238
  if (session.getSource() != None):
239
    out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w")
240
    out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n")
241
    source_chunks = session.getSource().published_interval(\
242
        (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
243

    
244
    for peer in session.peers:
245
      old_chunks = set()
246
      peer_chunks_ids = set(peer.published['chunkID'])
247
      for instant in range(session.time+begin_sec,session.time+end_sec,delta):
248
        delta_chunks = set(source_chunks[source_chunks['logTime'] <= (instant*1000000)]['chunkID']).difference(old_chunks)
249
        n_source_chunks = len(delta_chunks)
250
        n_lost_chunks = len(delta_chunks.difference(peer_chunks_ids))
251
        out_file.write("LOSS,"+str(session.time)+","+str(instant)+","+\
252
            str(n_source_chunks)+","+purify_hostname(peer.hostname)+","+str(n_lost_chunks)+"\n")
253
        old_chunks = old_chunks.union(delta_chunks)
254
        
255
    out_file.close()
256

    
257
def hostnames2key(names):
258
  return '-'.join(names)
259

    
260
def key2hostnames(key):
261
  return key.split('-')
262

    
263
def receptionAnalyse(folder,exp,begin_sec,end_sec):
264
  out_file = open(folder+"/edges.exp","w")
265
  out_file.write("info_type,session_id,peer_sender,peer_receiver,weight\n")
266

    
267
  for session in exp.sessions:
268
    edges = {}
269
    for peer in session.peers:
270
      records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
271
      records=records[records['action'] == "RECEIVED"]
272
      sender=records['sender']
273
      receiver=records['receiver']
274
      for rec in sender.iteritems(): #range(0,sender.size):
275
        i = rec[0]
276
        if hostnames2key([sender[i],receiver[i]]) in edges.keys():
277
          edges[hostnames2key([sender[i],receiver[i]])] += 1
278
        else:
279
          edges[hostnames2key([sender[i],receiver[i]])] = 1
280
    for edge in edges.keys():
281
#     print edge
282
      host1 = exp.addr2hostname(key2hostnames(edge)[0])
283
      host2 = exp.addr2hostname(key2hostnames(edge)[1])
284
#     print str(host1) + ' ' + str(host2)
285
      out_file.write("EDGE,"+str(session.time)+","+ \
286
          purify_hostname(host1)+","+ \
287
          purify_hostname(host2)+","+ \
288
          str(edges[edge])+"\n")
289

    
290
  out_file.close()
291

    
292
def rttAnalyse(folder,exp):
293
  out_file = open(folder + "/slivers_rtts.exp","w")
294
  out_file.write("RTT_TYPE,SRC,DST,MSEC,MDEV\n")
295

    
296
  for logfile in os.listdir(folder):
297
    if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: 
298
      print "RTT analysing: "+logfile
299
      data = read_csv(folder + "/" + logfile)
300
      data = data[data['rtt'] != -1].reindex(range(0,len(data)))
301
      if len(data)>0:
302
        sliver = purify_hostname(hostname_from_filename(logfile))#data['hostname'][0])
303
        avgs = data.groupby("addr").mean()
304
        maxs = data.groupby("addr").max()
305

    
306
        for i in range(0,len(avgs)):
307
          dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
308
          rtt = avgs.ix[i]['rtt']
309
          mdev = avgs.ix[i]['mdev']
310
          out_file.write("RTT_AVG,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
311

    
312
        for i in range(0,len(maxs)):
313
          dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
314
          rtt = maxs.ix[i]['rtt']
315
          mdev = maxs.ix[i]['mdev']
316
          out_file.write("RTT_MAX,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
317

    
318
  out_file.close()
319

    
320

    
321
def hopsAnalyse(folder,exp,begin_sec,end_sec):
322
  out_file = open(folder+"/packets_hops.exp","w")
323
  out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n")
324
  delta = int((end_sec-begin_sec)/100) # time resolution in seconds
325
  if delta == 0:
326
    delta = 1
327
  for session in exp.sessions:
328
    if (session.getSource() != None):
329
      for peer in session.peers:
330
        for instant in range(session.time+begin_sec,session.time+end_sec,delta):
331
          delta_hops = peer.published_interval_sec(instant,instant+delta)['hopcount']
332
          out_file.write("HOP,"+str(session.time)+","+str(instant+delta)+","+str(delta_hops.count())+","+\
333
            peer.hostname+","+str(delta_hops.mean())+"\n")
334

    
335
    else:
336
      warning ("No source for session "+str(session.time))
337

    
338
  out_file.close()
339

    
340
def seekPathFile(folder,session_id):
341
  paths = {}
342
  for logfile in os.listdir(folder):
343
      if logfile.endswith(".paths"):
344
        time = logfile.split('_')[-1]
345
        time = int(time.split('.')[0])
346
        if time <= session_id:
347
          paths[time] = logfile
348
  if len(paths.keys()) > 0:
349
    return folder+'/'+paths[min(paths.keys())]
350
  else:
351
    return None
352

    
353
def triel(n,x,y):                                                                   
354
  i = min(x,y)                                                                
355
  j = max(x,y)                                                                
356
  k= int(j-1+float((2*n-3)*i-i**2)/2)                                         
357
  return k  
358

    
359
def spath_id(nodes_num,path): 
360
  a = {}    
361
  for i in range(0,len(path)-1):                                                      
362
    a[triel(nodes_num,path[i],path[i+1])] = 1 # redundant, we only need initialization                                                                      
363
  return a    
364

    
365
def loadShortestPaths(paths_file):
366
  names = {}
367
  edges = {}
368

    
369
  for line in  open(paths_file,'r'):
370
    line = line.strip()
371
    for ipa in line.split(','):
372
      if ipa not in names.keys():
373
        names[ipa] = len(names.keys())
374

    
375
  n = len(names.keys())
376
  for line in  open(paths_file,'r'):
377
    line = line.strip()
378
    elems = [names[x] for x in line.split(',')]
379
    src = elems[0]
380
    dst = elems[-1]
381
    edges[triel(n,src,dst)] = spath_id(n,elems)
382

    
383
  return edges,names
384

    
385
def logs2edges(names,logs):
386
  edges = []
387
  num = len(names.keys())
388
  for i in logs.index:
389
    row = logs.ix[i]
390
    src = names[row['logger'].split(':')[0]]
391
    dst = names[row['peer'].split(':')[0]]
392
    edges.append(triel(num,src,dst))
393
  return edges
394

    
395
def add_spath_id(a1,a2):                                                        
396
      return dict(Counter(a1)+Counter(a2))
397

    
398
def neighAnalyse(folder,exp,begin_sec,end_sec):
399
  time_sensibility = 10
400
  out_file = open(folder+"/network_impact.exp","w")
401
  out_file.write("info_type,session_id,time,netimpact\n")
402

    
403
  for session in exp.sessions:
404
    paths_file = seekPathFile(folder,session.time)
405
    if paths_file:
406
      paths,names = loadShortestPaths(paths_file)
407
      for instant in range(begin_sec,end_sec,time_sensibility): 
408
        edges = []
409
        for peer in session.peers:
410
          logs = peer.neighbours_interval_sec(session.time + instant,session.time + instant+time_sensibility)
411
          if len(logs) > 0:
412
            maxtime = max(logs['logTime'])
413
            logs = logs[logs['logTime'] == maxtime]
414
            edges = edges +  logs2edges(names,logs)
415
          else:
416
            print "WARNING: no neighbourhood data for interval " + str(session.time+begin_sec) + "-" + str(session.time+end_sec)
417
        
418
        sum_edges = {}
419
        for e in edges:
420
          sum_edges = add_spath_id(sum_edges,paths[e])
421

    
422
        out_file.write("NEIGHIMPACT" +','+ str(session.time)+','+str(instant)+','+str(nm.linalg.norm(sum_edges.values(),ord=2))+'\n')
423
    else:
424
      print "WARNING: shortest paths file not found for session "+str(session.time)
425
  out_file.close()
426

    
427
def lossAnalyse(folder,exp,begin_sec,end_sec):
428
  out_file = open(folder+"/packets_loss.exp","w")
429
  out_file.write("info_type,session_id,chunks,peer_hostname,losts\n")
430

    
431
  for session in exp.sessions:
432
    sessionLossAnalyse(folder,session,begin_sec,end_sec)
433
    print "Session: "+str(session.time)
434
    if (session.getSource() != None):
435
      source_ids = set(session.getSource().published_interval(\
436
          (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)\
437
          ['chunkID'])
438

    
439
      for peer in session.peers:
440
        if not peer.is_source:
441
          peer_ids = set(peer.published['chunkID'])#_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)['chunkID'])
442
          print "Peer "+peer.hostname+" lost "+\
443
              str(len(source_ids.difference(peer_ids)))+\
444
              "/"+str(len(source_ids))+" packets" 
445
          out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\
446
              ","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n")
447
    else:
448
      warning("source not found for session " + str(session.time))
449

    
450
  out_file.close()
451

    
452

    
453
def dumpHostnameMap(folder,exp):
454
  hostnames = {}
455
  for session in exp.sessions:
456
    for peer in session.peers:
457
      hostnames[peer.hostname] = peer.address
458

    
459
  out_file = open(folder+"/hostnames2address.exp","w")
460
  out_file.write("info_type,hostname,address\n")
461
  for key in hostnames.keys():
462
    out_file.write("MAPPING,"+key+","+hostnames[key]+"\n")
463

    
464
  out_file.close()
465
  
466

    
467
def analyze_main(argv):
468
  try:
469
    opts,args = getopt.getopt(argv,"hf:",["help","folder"])
470
  except getopt.GetoptError:
471
    sys.exit(2)
472
  for opt,arg in opts:
473
    if opt in ("-h","--help"):
474
      sys.exit()
475
    elif opt in ("-f","--folder"):
476
      folder = arg
477
  
478
  try:
479
    folder
480
  except NameError:
481
    sys.exit()
482
  print "folder is " + folder
483
  exp = Experiment()
484
  pm = pmgr.ProcessManager()
485
  preproc_chunklog(folder)
486
  preproc_neighlog(folder)
487
  preproc_csv(folder,procman=pm)
488
  pm.joinAll()
489

    
490
  dataPopulate(folder,exp)
491
  #pm.launchProcess(rttAnalyse,[folder,exp])
492
  pm.launchProcess(lossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
493
  pm.launchProcess(neighAnalyse,[folder,exp,0,600])
494
  #pm.launchProcess(ICMPLossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
495
  #pm.launchProcess(delayAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
496
  #pm.launchProcess(receptionAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
497
  #pm.launchProcess(hopsAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
498
  pm.joinAll()
499
  dumpHostnameMap(folder,exp)
500

    
501
  for session in exp.sessions:
502
    print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers"
503

    
504
if __name__ == "__main__":
505
  analyze_main(sys.argv[1:])