Statistics
| Branch: | Revision:

peerstreamer-logs-analyzer / py_logs_analizer.py @ 1a481d7b

History | View | Annotate | Download (11.1 KB)

1
#!/usr/bin/python
2
import getopt,os,sys
3
import numpy as nm
4
from pandas import *
5
from random import randrange
6
import scipy.interpolate
7
import shutil as sh
8

    
9
sys.path.insert(0,'lib')
10
import process_manager as pmgr
11
from experiment import *
12
from peerstreamer_logs import *
13

    
14
def rescale_log_time(logdata,offsetdata,server_addr):
15
        if (len(set(offsetdata['addr']))) > 1:
16
                offsetdata = offsetdata[offsetdata['addr'] == server_addr]
17
        if len(logdata)>0:
18
                offsetdata = offsetdata.set_index('unixtime')
19
                offsetdata = offsetdata[offsetdata['offset'] != -1]
20
                future = pow(10,20)
21

    
22
                if (len(offsetdata) > 0):
23
                        x = [0]+list(offsetdata.index)+[future]
24
                        y = [i*1000000 for i in ([0]+list(offsetdata['offset'])+[0])] # list of offset to apply to each time in x
25
                        y = [sum(i) for i in zip(x,y)] # correct value of time for each given time in x
26
                        func = scipy.interpolate.interp1d(x,y)
27

    
28
                        logdata['logTime'] = logdata['logTime'].apply(func)
29
                else:
30
                        print ("[WARNING] time not rescaled for sliver ")
31

    
32
        return logdata
33

    
34

    
35
def dataPopulate(folder,exp):
36
        if source_hostname(folder):
37
                source = source_addr(folder)
38
                for logfile in os.listdir(folder):
39
                        if logfile.endswith(".log.csv"):
40
                                print "Loading file: "+logfile
41
                                session = exp.getSession(time_from_filename(logfile))
42
                                peer_data = read_csv(folder+"/"+logfile)
43
#                                if len(peer_data) > 0:
44
                                offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
45

    
46
                                peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
47
                                                rescale_log_time(peer_data,offset_data,source))
48
#                                                peer_data)
49
                                session.addPeer(peer)
50

    
51
def purify(delim,line):
52
        if(line.count(delim)>1):
53
                print "[WARNING] "+ str(line.count(delim)-1) +" incomplete records."
54
        return delim+line.split(delim)[-1]
55

    
56
def preproc_csv_file(csvfile):
57
        tmpfile="/tmp/tmpcsv"+str(randrange(99999999999999999))
58
        csvfile = os.path.abspath(csvfile)
59
        infile = open(csvfile,"r")
60
        outfile = open(tmpfile,"w")
61
        field_n = 0
62
        modified = False
63
        for in_line in infile.xreadlines():
64
                if len(in_line.split(',')) != field_n:
65
                        if field_n == 0:
66
                                field_n = len(in_line.split(','))
67
                                outfile.write(in_line)
68
                        else:
69
#                                print "[WARNING] Non conformant line:\n\t"+in_line
70
                                modified = True
71
                else:
72
                        outfile.write(in_line)
73
        infile.close()
74
        outfile.close()
75
        if modified:
76
                sh.move(tmpfile,csvfile)
77
#                os.renames(tmpfile,csvfile)
78

    
79
def preproc_csv(folder,procman=None):
80
        for csvfile in os.listdir(folder):
81
                if csvfile.endswith(".csv"):
82
                        print "Preprocessing CSV file: "+csvfile
83
                        if procman:
84
                                procman.launchProcess(preproc_csv_file,[folder+"/"+csvfile])
85
                        else:
86
                                preproc_csv_file(folder+"/"+csvfile)
87

    
88
def preproc_chunklog(folder):
89
        for logfile in os.listdir(folder):
90
                if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
91
                        print "Preprocessing file: "+logfile
92
                        infile = open(folder+"/"+logfile,"r")
93
                        csvfile = open(folder+"/"+logfile+".csv","w")
94
                        csvfile.write("CHUNK_LOG,logTime,sender"+\
95
                                        ",receiver,chunkID,size[bytes],chunkTimestamp,hopcount,action\n")
96
                        for in_line in infile.xreadlines():
97
                                if "CHUNK_LOG" in in_line[:10]:
98
                                        csvfile.write(purify("[CHUNK_LOG]",in_line))
99
                        infile.close()
100
                        csvfile.close()
101

    
102
def delaySessionAnalyse(folder,session,begin_sec,end_sec):
103
        if (session.getSource() != None):
104
                out_file = open(folder+"/"+str(session.time)+"_session_delay.exp","w")
105
                out_file.write("info_type,session_id,peer_hostname,delay,hops\n")
106

    
107
                for peer in session.peers:
108
                        logs = peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
109
                        received = logs[logs['action'] == "RECEIVED"]
110

    
111
                        for i in received.index:
112
                                row = received.ix[i]
113
                                out_file.write("DELAY,"+str(session.time)+","\
114
                                                +purify_hostname(peer.hostname)+","\
115
                                                +str(row['logTime'] - row['chunkTimestamp'])+","\
116
                                                        +str(row['hopcount'])+"\n")
117

    
118
#                        for delay in (received['logTime']-received['chunkTimestamp']):
119
#                                out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(delay)+"\n")
120
                                
121
                out_file.close()
122

    
123

    
124
def delayAnalyse(folder,exp,begin_sec,end_sec):
125
        out_file = open(folder+"/packets_delay.exp","w")
126
        out_file.write("info_type,session_id,peer_hostname,avg_delay\n")
127
        for session in exp.sessions:
128
                delaySessionAnalyse(folder,session,begin_sec,end_sec)
129
                for peer in session.peers:
130
                        if not peer.is_source:
131
                                records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
132
                                received=records[records['action'] == 'RECEIVED']
133
                                minimum=(received['logTime']-received['chunkTimestamp']).min()
134
                                if (minimum < -5000): 
135
                                        print "[WARNING] Timestamps seems to be not consistent!" # -5msec
136
                                        print "          Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds"
137
                                mean=(received['logTime']-received['chunkTimestamp']).mean()
138
                                out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n")
139

    
140
        out_file.close()
141

    
142
def sessionLossAnalyse(folder,session,begin_sec,end_sec):
143
        delta = (end_sec-begin_sec)/100 # time resolution in seconds
144
        if (session.getSource() != None):
145
                out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w")
146
                out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n")
147
                source_chunks = session.getSource().published_interval(\
148
                                (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
149

    
150
                for peer in session.peers:
151
                        old_chunks = set()
152
                        peer_chunks_ids = set(peer.published['chunkID'])
153
                        for instant in range(session.time+begin_sec,session.time+end_sec,delta):
154
                                delta_chunks = set(source_chunks[source_chunks['logTime'] <= (instant*1000000)]['chunkID']).difference(old_chunks)
155
                                n_source_chunks = len(delta_chunks)
156
                                n_lost_chunks = len(delta_chunks.difference(peer_chunks_ids))
157
                                out_file.write("LOSS,"+str(session.time)+","+str(instant)+","+\
158
                                                str(n_source_chunks)+","+purify_hostname(peer.hostname)+","+str(n_lost_chunks)+"\n")
159
                                old_chunks = old_chunks.union(delta_chunks)
160
                                
161
                out_file.close()
162

    
163
def hostnames2key(names):
164
        return '-'.join(names)
165

    
166
def key2hostnames(key):
167
        return key.split('-')
168

    
169
def receptionAnalyse(folder,exp,begin_sec,end_sec):
170
        out_file = open(folder+"/edges.exp","w")
171
        out_file.write("info_type,session_id,peer_sender,peer_receiver,weight\n")
172

    
173
        for session in exp.sessions:
174
                edges = {}
175
                for peer in session.peers:
176
                        records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
177
                        records=records[records['action'] == "RECEIVED"]
178
                        sender=records['sender']
179
                        receiver=records['receiver']
180
                        for rec in sender.iteritems(): #range(0,sender.size):
181
                                i = rec[0]
182
                                if hostnames2key([sender[i],receiver[i]]) in edges.keys():
183
                                        edges[hostnames2key([sender[i],receiver[i]])] += 1
184
                                else:
185
                                        edges[hostnames2key([sender[i],receiver[i]])] = 1
186
                for edge in edges.keys():
187
#                        print edge
188
                        host1 = exp.addr2hostname(key2hostnames(edge)[0])
189
                        host2 = exp.addr2hostname(key2hostnames(edge)[1])
190
#                        print str(host1) + ' ' + str(host2)
191
                        out_file.write("EDGE,"+str(session.time)+","+ \
192
                                        purify_hostname(host1)+","+ \
193
                                        purify_hostname(host2)+","+ \
194
                                        str(edges[edge])+"\n")
195

    
196
        out_file.close()
197

    
198
def rttAnalyse(folder,exp):
199
        out_file = open(folder + "/slivers_rtts.exp","w")
200
        out_file.write("RTT_TYPE,SRC,DST,MSEC,MDEV\n")
201

    
202
        for logfile in os.listdir(folder):
203
                if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: 
204
                        print "RTT analysing: "+logfile
205
                        data = read_csv(folder + "/" + logfile)
206
                        data = data[data['rtt'] != -1]
207
                        if len(data)>0:
208
                                sliver = purify_hostname(data['hostname'][0])
209
                                avgs = data.groupby("addr").mean()
210
                                maxs = data.groupby("addr").max()
211

    
212
                                for i in range(0,len(avgs)):
213
                                        dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
214
                                        rtt = avgs.ix[i]['rtt']
215
                                        mdev = avgs.ix[i]['mdev']
216
                                        out_file.write("RTT_AVG,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
217

    
218
                                for i in range(0,len(maxs)):
219
                                        dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
220
                                        rtt = maxs.ix[i]['rtt']
221
                                        mdev = maxs.ix[i]['mdev']
222
                                        out_file.write("RTT_MAX,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
223

    
224
        out_file.close()
225

    
226

    
227
def hopsAnalyse(folder,exp,begin_sec,end_sec):
228
        out_file = open(folder+"/packets_hops.exp","w")
229
        out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n")
230
        delta = (end_sec-begin_sec)/100 # time resolution in seconds
231
        for session in exp.sessions:
232
                if (session.getSource() != None):
233
                        for peer in session.peers:
234
                                for instant in range(session.time+begin_sec,session.time+end_sec,delta):
235
                                        delta_hops = peer.published_interval_sec(instant,instant+delta)['hopcount']
236
                                        out_file.write("HOP,"+str(session.time)+","+str(instant+delta)+","+str(delta_hops.count())+","+\
237
                                                peer.hostname+","+str(delta_hops.mean())+"\n")
238

    
239
                else:
240
                        warning ("No source for session "+str(session.time))
241

    
242
        out_file.close()
243

    
244
def lossAnalyse(folder,exp,begin_sec,end_sec):
245
        out_file = open(folder+"/packets_loss.exp","w")
246
        out_file.write("info_type,session_id,chunks,peer_hostname,losts\n")
247

    
248
        for session in exp.sessions:
249
                sessionLossAnalyse(folder,session,begin_sec,end_sec)
250
                print "Session: "+str(session.time)
251
                if (session.getSource() != None):
252
                        source_ids = set(session.getSource().published_interval(\
253
                                        (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)\
254
                                        ['chunkID'])
255

    
256
                        for peer in session.peers:
257
                                if not peer.is_source:
258
                                        peer_ids = set(peer.published['chunkID'])#_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)['chunkID'])
259
                                        print "Peer "+peer.hostname+" lost "+\
260
                                                        str(len(source_ids.difference(peer_ids)))+\
261
                                                        "/"+str(len(source_ids))+" packets" 
262
                                        out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\
263
                                                        ","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n")
264
                else:
265
                        print "[WARNING] source not found"
266

    
267
        out_file.close()
268

    
269

    
270
def        dumpHostnameMap(folder,exp):
271
        hostnames = {}
272
        for session in exp.sessions:
273
                for peer in session.peers:
274
                        hostnames[peer.hostname] = peer.address
275

    
276
        out_file = open(folder+"/hostnames2address.exp","w")
277
        out_file.write("info_type,hostname,address\n")
278
        for key in hostnames.keys():
279
                out_file.write("MAPPING,"+key+","+hostnames[key]+"\n")
280

    
281
        out_file.close()
282
        
283

    
284
def main(argv):
285
        try:
286
                opts,args = getopt.getopt(argv,"hf:",["help","folder"])
287
        except getopt.GetoptError:
288
                sys.exit(2)
289
        for opt,arg in opts:
290
                if opt in ("-h","--help"):
291
                        sys.exit()
292
                elif opt in ("-f","--folder"):
293
                        folder = arg
294
        
295
        try:
296
                folder
297
        except NameError:
298
                sys.exit()
299
        print "folder is " + folder
300
        exp = Experiment()
301
        pm = pmgr.ProcessManager()
302
        preproc_chunklog(folder)
303
        preproc_csv(folder,procman=pm)
304
        preproc_csv(folder)
305
        pm.joinAll()
306
#
307
        dataPopulate(folder,exp)
308
        pm.launchProcess(rttAnalyse,[folder,exp])
309
        pm.launchProcess(lossAnalyse,[folder,exp,150,450])
310
        pm.launchProcess(delayAnalyse,[folder,exp,150,450])
311
        pm.launchProcess(receptionAnalyse,[folder,exp,150,450])
312
        pm.launchProcess(hopsAnalyse,[folder,exp,150,450])
313
        pm.joinAll()
314
        dumpHostnameMap(folder,exp)
315

    
316
        for session in exp.sessions:
317
                print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers"
318

    
319
if __name__ == "__main__":
320
        main(sys.argv[1:])