Statistics
| Branch: | Revision:

peerstreamer-logs-analyzer / py_logs_analizer.py @ a60088f6

History | View | Annotate | Download (14.5 KB)

1
#!/usr/bin/python
2
from __future__ import division
3
import getopt,os,sys
4
import numpy as nm
5
from pandas import *
6
from random import randrange
7
import scipy.interpolate
8
import shutil as sh
9

    
10
sys.path.insert(0,'lib')
11
import process_manager as pmgr
12
from experiment import *
13
from peerstreamer_logs import *
14
from utilities import *
15

    
16

    
17
START_ANALYSIS_TIME = 60 #150
18
STOP_ANALYSIS_TIME = 120 #450
19

    
20
def rescale_log_time(logdata,offsetdata,server_addr):
21
        if (offsetdata is None) or 'addr' not in offsetdata.columns:
22
                return logdata
23
        if (len(set(offsetdata['addr']))) > 1:
24
                offsetdata = offsetdata[offsetdata['addr'] == server_addr]
25
        if len(logdata)>0:
26
                offsetdata = offsetdata.set_index('unixtime')
27
                offsetdata = offsetdata[offsetdata['offset'] != -1]
28
                future = pow(10,20)
29

    
30
                if (len(offsetdata) > 0):
31
                        x = [0]+list(offsetdata.index)+[future]
32
                        y = [i*1000000 for i in ([0]+list(offsetdata['offset'])+[0])] # list of offset to apply to each time in x
33
                        y = [sum(i) for i in zip(x,y)] # correct value of time for each given time in x
34
                        func = scipy.interpolate.interp1d(x,y)
35

    
36
                        logdata['logTime'] = logdata['logTime'].apply(func)
37
                else:
38
                        warning(" time not rescaled for sliver ")
39

    
40
        return logdata
41

    
42

    
43
def dataPopulate(folder,exp):
44
        # create the sessions with the sources
45
        for logfile in os.listdir(folder):
46
                        if logfile.endswith(".log.csv") and is_source_from_filename(logfile):
47
                                print "Loading source file: "+logfile
48
                                session = exp.getSession(time_from_filename(logfile),creation=True)
49
                                peer_data = read_csv(folder+"/"+logfile)
50
                                try:
51
                                        offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
52
                                except IOError:
53
                                        offset_data = None
54

    
55
                                peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
56
                                                rescale_log_time(peer_data,offset_data,hostname_from_filename(logfile)))
57
                                session.addPeer(peer)
58

    
59
# add the peers
60
        for logfile in os.listdir(folder):
61
                if logfile.endswith(".log.csv") and not is_source_from_filename(logfile):
62
                        print "Loading generic peer file: "+logfile
63
                        session = exp.getSession(time_from_filename(logfile))
64
                        if session is not None:
65
                            source = session.getSource()
66
                            peer_data = read_csv(folder+"/"+logfile)
67
                            try:
68
                                    offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
69
                            except IOError:
70
                                    offset_data = None
71

    
72
                            peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
73
                                            rescale_log_time(peer_data,offset_data,source))
74
                            session.addPeer(peer)
75

    
76
# prune deviating sessions
77
        for session in exp.sessions:
78
                for peer in session.peers:
79
                    rtt_file = rtt_filename(folder,peer)
80
                    if rtt_file is not None:
81
                        ping_data = read_csv(rtt_file)
82
                        ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + START_ANALYSIS_TIME)) & \
83
                                        (ping_data['unixtime'] <= (session.time + STOP_ANALYSIS_TIME))]
84
                        ping_sent = ping_data['sent'].sum()
85
                        echo_received = ping_data['answers'].sum()
86
                        if ping_sent==0:
87
                            warning("seems that peer "+peer.hostname+" has no ICMP data")
88
                        if ping_sent!=0 and ((echo_received/ping_sent) < 0.7):
89
                                exp.removeSessionByTime(session.time)
90
                                print ping_sent
91
                                print echo_received
92
                                warning( "REMOVED SESSION "+ str(session.time))
93
                                warning (" because of "+rtt_file)
94
                    else:
95
                        warning("peer "+peer.hostname+" has not ICMP file")
96

    
97
def ICMPLossAnalyse(folder,exp,begin_sec,end_sec):
98
    out_file = open(folder+"/ICMPloss.exp","w")
99
    out_file.write("session,peer,echo_request,echo_response\n")
100
    for session in exp.sessions:
101
        for peer in session.peers:
102
            rtt_file = rtt_filename(folder,peer)
103
            if rtt_file is not None:
104
                ping_data = read_csv(rtt_file)
105
                ping_data = ping_data[ (ping_data['unixtime'] >= (session.time + begin_sec)) & \
106
                                (ping_data['unixtime'] <= (session.time + end_sec))]
107
                ping_sent = ping_data['sent'].sum()
108
                echo_received = ping_data['answers'].sum()
109
                out_file.write(str(session.time)+","+str(purify_hostname(peer.hostname))+","+str(ping_sent)+","+str(echo_received)+"\n")
110
    out_file.close()
111

    
112
def rtt_filename(folder,peer):
113
        for logfile in os.listdir(folder):
114
                if logfile.startswith(peer.hostname+"_rtt_") and logfile.endswith(".csv"):
115
                        return folder+'/'+logfile
116

    
117
def purify(delim,line):
118
        if(line.count(delim)>1):
119
                warning( str(line.count(delim)-1) +" incomplete records.")
120
        return delim+line.split(delim)[-1]
121

    
122
def preproc_csv_file(csvfile):
123
        tmpfile="/tmp/tmpcsv"+str(randrange(99999999999999999))
124
        csvfile = os.path.abspath(csvfile)
125
        infile = open(csvfile,"r")
126
        outfile = open(tmpfile,"w")
127
        field_n = 0
128
        modified = False
129
        for in_line in infile.xreadlines():
130
                if len(in_line.split(',')) != field_n:
131
                        if field_n == 0:
132
                                field_n = len(in_line.split(','))
133
                                outfile.write(in_line)
134
                        else:
135
#                                print "[WARNING] Non conformant line:\n\t"+in_line
136
                                modified = True
137
                else:
138
                        outfile.write(in_line)
139
        infile.close()
140
        outfile.close()
141
        if modified:
142
                sh.move(tmpfile,csvfile)
143
#                os.renames(tmpfile,csvfile)
144

    
145
def preproc_csv(folder,procman=None):
146
        for csvfile in os.listdir(folder):
147
                if csvfile.endswith(".csv"):
148
                        print "Preprocessing CSV file: "+csvfile
149
                        if procman:
150
                                procman.launchProcess(preproc_csv_file,[folder+"/"+csvfile])
151
                        else:
152
                                preproc_csv_file(folder+"/"+csvfile)
153

    
154
def preproc_chunklog(folder):
155
        for logfile in os.listdir(folder):
156
                if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
157
                        print "Preprocessing file: "+logfile
158
                        infile = open(folder+"/"+logfile,"r")
159
                        csvfile = open(folder+"/"+logfile+".csv","w")
160
                        csvfile.write("CHUNK_LOG,logTime,sender"+\
161
                                        ",receiver,chunkID,size[bytes],chunkTimestamp,hopcount,action\n")
162
                        for in_line in infile.xreadlines():
163
                                if "CHUNK_LOG" in in_line[:10]:
164
                                        csvfile.write(purify("[CHUNK_LOG]",in_line))
165
                        infile.close()
166
                        csvfile.close()
167

    
168
def delaySessionAnalyse(folder,session,begin_sec,end_sec):
169
        if (session.getSource() != None):
170
                out_file = open(folder+"/"+str(session.time)+"_session_delay.exp","w")
171
                out_file.write("info_type,session_id,peer_hostname,delay,hops\n")
172

    
173
                for peer in session.peers:
174
                        logs = peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
175
                        received = logs[logs['action'] == "RECEIVED"]
176

    
177
                        for i in received.index:
178
                                row = received.ix[i]
179
                                out_file.write("DELAY,"+str(session.time)+","\
180
                                                +purify_hostname(peer.hostname)+","\
181
                                                +str(row['logTime'] - row['chunkTimestamp'])+","\
182
                                                        +str(row['hopcount'])+"\n")
183

    
184
#                        for delay in (received['logTime']-received['chunkTimestamp']):
185
#                                out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(delay)+"\n")
186
                                
187
                out_file.close()
188

    
189

    
190
def delayAnalyse(folder,exp,begin_sec,end_sec):
191
        out_file = open(folder+"/packets_delay.exp","w")
192
        out_file.write("info_type,session_id,peer_hostname,avg_delay\n")
193
        for session in exp.sessions:
194
                delaySessionAnalyse(folder,session,begin_sec,end_sec)
195
                for peer in session.peers:
196
                        if not peer.is_source:
197
                                records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
198
                                received=records[records['action'] == 'RECEIVED']
199
                                minimum=(received['logTime']-received['chunkTimestamp']).min()
200
                                if (minimum < -5000): 
201
                                        warning(" Timestamps seems to be not consistent!") # -5msec
202
                                        print "          Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds"
203
                                mean=(received['logTime']-received['chunkTimestamp']).mean()
204
                                out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n")
205

    
206
        out_file.close()
207

    
208
def sessionLossAnalyse(folder,session,begin_sec,end_sec):
209
        delta = int((end_sec-begin_sec)/100) # time resolution in seconds
210
        if delta == 0:
211
            delta = 1
212
        if (session.getSource() != None):
213
                out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w")
214
                out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n")
215
                source_chunks = session.getSource().published_interval(\
216
                                (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
217

    
218
                for peer in session.peers:
219
                        old_chunks = set()
220
                        peer_chunks_ids = set(peer.published['chunkID'])
221
                        for instant in range(session.time+begin_sec,session.time+end_sec,delta):
222
                                delta_chunks = set(source_chunks[source_chunks['logTime'] <= (instant*1000000)]['chunkID']).difference(old_chunks)
223
                                n_source_chunks = len(delta_chunks)
224
                                n_lost_chunks = len(delta_chunks.difference(peer_chunks_ids))
225
                                out_file.write("LOSS,"+str(session.time)+","+str(instant)+","+\
226
                                                str(n_source_chunks)+","+purify_hostname(peer.hostname)+","+str(n_lost_chunks)+"\n")
227
                                old_chunks = old_chunks.union(delta_chunks)
228
                                
229
                out_file.close()
230

    
231
def hostnames2key(names):
232
        return '-'.join(names)
233

    
234
def key2hostnames(key):
235
        return key.split('-')
236

    
237
def receptionAnalyse(folder,exp,begin_sec,end_sec):
238
        out_file = open(folder+"/edges.exp","w")
239
        out_file.write("info_type,session_id,peer_sender,peer_receiver,weight\n")
240

    
241
        for session in exp.sessions:
242
                edges = {}
243
                for peer in session.peers:
244
                        records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
245
                        records=records[records['action'] == "RECEIVED"]
246
                        sender=records['sender']
247
                        receiver=records['receiver']
248
                        for rec in sender.iteritems(): #range(0,sender.size):
249
                                i = rec[0]
250
                                if hostnames2key([sender[i],receiver[i]]) in edges.keys():
251
                                        edges[hostnames2key([sender[i],receiver[i]])] += 1
252
                                else:
253
                                        edges[hostnames2key([sender[i],receiver[i]])] = 1
254
                for edge in edges.keys():
255
#                        print edge
256
                        host1 = exp.addr2hostname(key2hostnames(edge)[0])
257
                        host2 = exp.addr2hostname(key2hostnames(edge)[1])
258
#                        print str(host1) + ' ' + str(host2)
259
                        out_file.write("EDGE,"+str(session.time)+","+ \
260
                                        purify_hostname(host1)+","+ \
261
                                        purify_hostname(host2)+","+ \
262
                                        str(edges[edge])+"\n")
263

    
264
        out_file.close()
265

    
266
def rttAnalyse(folder,exp):
267
        out_file = open(folder + "/slivers_rtts.exp","w")
268
        out_file.write("RTT_TYPE,SRC,DST,MSEC,MDEV\n")
269

    
270
        for logfile in os.listdir(folder):
271
                if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: 
272
                        print "RTT analysing: "+logfile
273
                        data = read_csv(folder + "/" + logfile)
274
                        data = data[data['rtt'] != -1].reindex(range(0,len(data)))
275
                        if len(data)>0:
276
                                sliver = purify_hostname(hostname_from_filename(logfile))#data['hostname'][0])
277
                                avgs = data.groupby("addr").mean()
278
                                maxs = data.groupby("addr").max()
279

    
280
                                for i in range(0,len(avgs)):
281
                                        dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
282
                                        rtt = avgs.ix[i]['rtt']
283
                                        mdev = avgs.ix[i]['mdev']
284
                                        out_file.write("RTT_AVG,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
285

    
286
                                for i in range(0,len(maxs)):
287
                                        dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
288
                                        rtt = maxs.ix[i]['rtt']
289
                                        mdev = maxs.ix[i]['mdev']
290
                                        out_file.write("RTT_MAX,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
291

    
292
        out_file.close()
293

    
294

    
295
def hopsAnalyse(folder,exp,begin_sec,end_sec):
296
        out_file = open(folder+"/packets_hops.exp","w")
297
        out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n")
298
        delta = int((end_sec-begin_sec)/100) # time resolution in seconds
299
        if delta == 0:
300
            delta = 1
301
        for session in exp.sessions:
302
                if (session.getSource() != None):
303
                        for peer in session.peers:
304
                                for instant in range(session.time+begin_sec,session.time+end_sec,delta):
305
                                        delta_hops = peer.published_interval_sec(instant,instant+delta)['hopcount']
306
                                        out_file.write("HOP,"+str(session.time)+","+str(instant+delta)+","+str(delta_hops.count())+","+\
307
                                                peer.hostname+","+str(delta_hops.mean())+"\n")
308

    
309
                else:
310
                        warning ("No source for session "+str(session.time))
311

    
312
        out_file.close()
313

    
314
def lossAnalyse(folder,exp,begin_sec,end_sec):
315
        out_file = open(folder+"/packets_loss.exp","w")
316
        out_file.write("info_type,session_id,chunks,peer_hostname,losts\n")
317

    
318
        for session in exp.sessions:
319
                sessionLossAnalyse(folder,session,begin_sec,end_sec)
320
                print "Session: "+str(session.time)
321
                if (session.getSource() != None):
322
                        source_ids = set(session.getSource().published_interval(\
323
                                        (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)\
324
                                        ['chunkID'])
325

    
326
                        for peer in session.peers:
327
                                if not peer.is_source:
328
                                        peer_ids = set(peer.published['chunkID'])#_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)['chunkID'])
329
                                        print "Peer "+peer.hostname+" lost "+\
330
                                                        str(len(source_ids.difference(peer_ids)))+\
331
                                                        "/"+str(len(source_ids))+" packets" 
332
                                        out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\
333
                                                        ","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n")
334
                else:
335
                        warning("source not found for session " + str(session.time))
336

    
337
        out_file.close()
338

    
339

    
340
def        dumpHostnameMap(folder,exp):
341
        hostnames = {}
342
        for session in exp.sessions:
343
                for peer in session.peers:
344
                        hostnames[peer.hostname] = peer.address
345

    
346
        out_file = open(folder+"/hostnames2address.exp","w")
347
        out_file.write("info_type,hostname,address\n")
348
        for key in hostnames.keys():
349
                out_file.write("MAPPING,"+key+","+hostnames[key]+"\n")
350

    
351
        out_file.close()
352
        
353

    
354
def analyze_main(argv):
355
        try:
356
                opts,args = getopt.getopt(argv,"hf:",["help","folder"])
357
        except getopt.GetoptError:
358
                sys.exit(2)
359
        for opt,arg in opts:
360
                if opt in ("-h","--help"):
361
                        sys.exit()
362
                elif opt in ("-f","--folder"):
363
                        folder = arg
364
        
365
        try:
366
                folder
367
        except NameError:
368
                sys.exit()
369
        print "folder is " + folder
370
        exp = Experiment()
371
        pm = pmgr.ProcessManager()
372
        preproc_chunklog(folder)
373
        preproc_csv(folder,procman=pm)
374
        pm.joinAll()
375

    
376
        dataPopulate(folder,exp)
377
        pm.launchProcess(rttAnalyse,[folder,exp])
378
        pm.launchProcess(lossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
379
        pm.launchProcess(ICMPLossAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
380
        pm.launchProcess(delayAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
381
        pm.launchProcess(receptionAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
382
        pm.launchProcess(hopsAnalyse,[folder,exp,START_ANALYSIS_TIME,STOP_ANALYSIS_TIME])
383
        pm.joinAll()
384
        dumpHostnameMap(folder,exp)
385

    
386
        for session in exp.sessions:
387
                print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers"
388

    
389
if __name__ == "__main__":
390
        analyze_main(sys.argv[1:])