Statistics
| Branch: | Revision:

peerstreamer-logs-analyzer / py_logs_analizer.py @ 1a481d7b

History | View | Annotate | Download (11.1 KB)

1 698b8685 Luca
#!/usr/bin/python
2
import getopt,os,sys
3
import numpy as nm
4
from pandas import *
5 05ddd673 Luca Baldesi
from random import randrange
6 ee19f27d Luca Baldesi
import scipy.interpolate
7 be87c355 Luca Baldesi
import shutil as sh
8 698b8685 Luca
9 878b6653 Luca
sys.path.insert(0,'lib')
10
import process_manager as pmgr
11 8b4d058a Luca Baldesi
from experiment import *
12
from peerstreamer_logs import *
13 3294e8f4 Luca
14
def rescale_log_time(logdata,offsetdata,server_addr):
15 633f4d39 Luca Baldesi
        if (len(set(offsetdata['addr']))) > 1:
16
                offsetdata = offsetdata[offsetdata['addr'] == server_addr]
17
        if len(logdata)>0:
18
                offsetdata = offsetdata.set_index('unixtime')
19
                offsetdata = offsetdata[offsetdata['offset'] != -1]
20
                future = pow(10,20)
21
22
                if (len(offsetdata) > 0):
23
                        x = [0]+list(offsetdata.index)+[future]
24
                        y = [i*1000000 for i in ([0]+list(offsetdata['offset'])+[0])] # list of offset to apply to each time in x
25
                        y = [sum(i) for i in zip(x,y)] # correct value of time for each given time in x
26
                        func = scipy.interpolate.interp1d(x,y)
27
28
                        logdata['logTime'] = logdata['logTime'].apply(func)
29
                else:
30
                        print ("[WARNING] time not rescaled for sliver ")
31 3294e8f4 Luca
32
        return logdata
33
34
35
def dataPopulate(folder,exp):
36
        if source_hostname(folder):
37 633f4d39 Luca Baldesi
                source = source_addr(folder)
38 3294e8f4 Luca
                for logfile in os.listdir(folder):
39
                        if logfile.endswith(".log.csv"):
40
                                print "Loading file: "+logfile
41
                                session = exp.getSession(time_from_filename(logfile))
42
                                peer_data = read_csv(folder+"/"+logfile)
43 633f4d39 Luca Baldesi
#                                if len(peer_data) > 0:
44 3294e8f4 Luca
                                offset_data = read_csv(timespread_filename(folder,hostname_from_filename(logfile)))
45
46
                                peer = Peer(hostname_from_filename(logfile),is_source_from_filename(logfile), \
47
                                                rescale_log_time(peer_data,offset_data,source))
48 ee19f27d Luca Baldesi
#                                                peer_data)
49 3294e8f4 Luca
                                session.addPeer(peer)
50 698b8685 Luca
51
def purify(delim,line):
52
        if(line.count(delim)>1):
53
                print "[WARNING] "+ str(line.count(delim)-1) +" incomplete records."
54
        return delim+line.split(delim)[-1]
55
56 05ddd673 Luca Baldesi
def preproc_csv_file(csvfile):
57
        tmpfile="/tmp/tmpcsv"+str(randrange(99999999999999999))
58 be87c355 Luca Baldesi
        csvfile = os.path.abspath(csvfile)
59 05ddd673 Luca Baldesi
        infile = open(csvfile,"r")
60
        outfile = open(tmpfile,"w")
61
        field_n = 0
62
        modified = False
63
        for in_line in infile.xreadlines():
64
                if len(in_line.split(',')) != field_n:
65
                        if field_n == 0:
66
                                field_n = len(in_line.split(','))
67
                                outfile.write(in_line)
68
                        else:
69 be87c355 Luca Baldesi
#                                print "[WARNING] Non conformant line:\n\t"+in_line
70 05ddd673 Luca Baldesi
                                modified = True
71
                else:
72
                        outfile.write(in_line)
73
        infile.close()
74
        outfile.close()
75
        if modified:
76 be87c355 Luca Baldesi
                sh.move(tmpfile,csvfile)
77
#                os.renames(tmpfile,csvfile)
78 698b8685 Luca
79 05ddd673 Luca Baldesi
def preproc_csv(folder,procman=None):
80 3294e8f4 Luca
        for csvfile in os.listdir(folder):
81
                if csvfile.endswith(".csv"):
82
                        print "Preprocessing CSV file: "+csvfile
83 05ddd673 Luca Baldesi
                        if procman:
84
                                procman.launchProcess(preproc_csv_file,[folder+"/"+csvfile])
85
                        else:
86
                                preproc_csv_file(folder+"/"+csvfile)
87 3294e8f4 Luca
88
def preproc_chunklog(folder):
89 698b8685 Luca
        for logfile in os.listdir(folder):
90
                if logfile.endswith(".log") and not os.path.isfile(folder+"/"+logfile+".csv"):
91
                        print "Preprocessing file: "+logfile
92
                        infile = open(folder+"/"+logfile,"r")
93
                        csvfile = open(folder+"/"+logfile+".csv","w")
94
                        csvfile.write("CHUNK_LOG,logTime,sender"+\
95
                                        ",receiver,chunkID,size[bytes],chunkTimestamp,hopcount,action\n")
96
                        for in_line in infile.xreadlines():
97
                                if "CHUNK_LOG" in in_line[:10]:
98
                                        csvfile.write(purify("[CHUNK_LOG]",in_line))
99 3294e8f4 Luca
                        infile.close()
100
                        csvfile.close()
101 698b8685 Luca
102
def delaySessionAnalyse(folder,session,begin_sec,end_sec):
103
        if (session.getSource() != None):
104
                out_file = open(folder+"/"+str(session.time)+"_session_delay.exp","w")
105 633f4d39 Luca Baldesi
                out_file.write("info_type,session_id,peer_hostname,delay,hops\n")
106 698b8685 Luca
107
                for peer in session.peers:
108
                        logs = peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
109
                        received = logs[logs['action'] == "RECEIVED"]
110 633f4d39 Luca Baldesi
111
                        for i in received.index:
112
                                row = received.ix[i]
113
                                out_file.write("DELAY,"+str(session.time)+","\
114
                                                +purify_hostname(peer.hostname)+","\
115
                                                +str(row['logTime'] - row['chunkTimestamp'])+","\
116
                                                        +str(row['hopcount'])+"\n")
117
118
#                        for delay in (received['logTime']-received['chunkTimestamp']):
119
#                                out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(delay)+"\n")
120 698b8685 Luca
                                
121
                out_file.close()
122
123
124
def delayAnalyse(folder,exp,begin_sec,end_sec):
125
        out_file = open(folder+"/packets_delay.exp","w")
126
        out_file.write("info_type,session_id,peer_hostname,avg_delay\n")
127
        for session in exp.sessions:
128
                delaySessionAnalyse(folder,session,begin_sec,end_sec)
129
                for peer in session.peers:
130
                        if not peer.is_source:
131
                                records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
132
                                received=records[records['action'] == 'RECEIVED']
133
                                minimum=(received['logTime']-received['chunkTimestamp']).min()
134
                                if (minimum < -5000): 
135
                                        print "[WARNING] Timestamps seems to be not consistent!" # -5msec
136
                                        print "          Last minimum delay of sliver "+peer.hostname+" was: "+str(minimum/1000000)+" seconds"
137
                                mean=(received['logTime']-received['chunkTimestamp']).mean()
138
                                out_file.write("DELAY,"+str(session.time)+","+purify_hostname(peer.hostname)+","+str(mean)+"\n")
139
140
        out_file.close()
141
142
def sessionLossAnalyse(folder,session,begin_sec,end_sec):
143
        delta = (end_sec-begin_sec)/100 # time resolution in seconds
144
        if (session.getSource() != None):
145
                out_file = open(folder+"/"+str(session.time)+"_session_loss.exp","w")
146
                out_file.write("info_type,session_id,time,chunks,peer_hostname,losts\n")
147
                source_chunks = session.getSource().published_interval(\
148
                                (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
149
150
                for peer in session.peers:
151
                        old_chunks = set()
152
                        peer_chunks_ids = set(peer.published['chunkID'])
153
                        for instant in range(session.time+begin_sec,session.time+end_sec,delta):
154
                                delta_chunks = set(source_chunks[source_chunks['logTime'] <= (instant*1000000)]['chunkID']).difference(old_chunks)
155
                                n_source_chunks = len(delta_chunks)
156
                                n_lost_chunks = len(delta_chunks.difference(peer_chunks_ids))
157
                                out_file.write("LOSS,"+str(session.time)+","+str(instant)+","+\
158
                                                str(n_source_chunks)+","+purify_hostname(peer.hostname)+","+str(n_lost_chunks)+"\n")
159
                                old_chunks = old_chunks.union(delta_chunks)
160
                                
161
                out_file.close()
162
163
def hostnames2key(names):
164
        return '-'.join(names)
165
166
def key2hostnames(key):
167
        return key.split('-')
168
169
def receptionAnalyse(folder,exp,begin_sec,end_sec):
170
        out_file = open(folder+"/edges.exp","w")
171
        out_file.write("info_type,session_id,peer_sender,peer_receiver,weight\n")
172
173
        for session in exp.sessions:
174
                edges = {}
175
                for peer in session.peers:
176
                        records=peer.published_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)
177
                        records=records[records['action'] == "RECEIVED"]
178
                        sender=records['sender']
179
                        receiver=records['receiver']
180
                        for rec in sender.iteritems(): #range(0,sender.size):
181
                                i = rec[0]
182
                                if hostnames2key([sender[i],receiver[i]]) in edges.keys():
183
                                        edges[hostnames2key([sender[i],receiver[i]])] += 1
184
                                else:
185
                                        edges[hostnames2key([sender[i],receiver[i]])] = 1
186
                for edge in edges.keys():
187 1a481d7b Luca Baldesi
#                        print edge
188
                        host1 = exp.addr2hostname(key2hostnames(edge)[0])
189
                        host2 = exp.addr2hostname(key2hostnames(edge)[1])
190
#                        print str(host1) + ' ' + str(host2)
191 698b8685 Luca
                        out_file.write("EDGE,"+str(session.time)+","+ \
192 1a481d7b Luca Baldesi
                                        purify_hostname(host1)+","+ \
193
                                        purify_hostname(host2)+","+ \
194 698b8685 Luca
                                        str(edges[edge])+"\n")
195
196
        out_file.close()
197
198 633f4d39 Luca Baldesi
def rttAnalyse(folder,exp):
199 53ccb966 Luca
        out_file = open(folder + "/slivers_rtts.exp","w")
200 05ddd673 Luca Baldesi
        out_file.write("RTT_TYPE,SRC,DST,MSEC,MDEV\n")
201 53ccb966 Luca
202
        for logfile in os.listdir(folder):
203 05ddd673 Luca Baldesi
                if "_rtt_" in logfile and logfile.endswith(".csv") and os.stat(folder+"/"+logfile).st_size > 0: 
204 53ccb966 Luca
                        print "RTT analysing: "+logfile
205
                        data = read_csv(folder + "/" + logfile)
206
                        data = data[data['rtt'] != -1]
207 633f4d39 Luca Baldesi
                        if len(data)>0:
208
                                sliver = purify_hostname(data['hostname'][0])
209
                                avgs = data.groupby("addr").mean()
210
                                maxs = data.groupby("addr").max()
211
212
                                for i in range(0,len(avgs)):
213 1a481d7b Luca Baldesi
                                        dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
214 633f4d39 Luca Baldesi
                                        rtt = avgs.ix[i]['rtt']
215
                                        mdev = avgs.ix[i]['mdev']
216
                                        out_file.write("RTT_AVG,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
217
218
                                for i in range(0,len(maxs)):
219 1a481d7b Luca Baldesi
                                        dst = purify_hostname(exp.addr2hostname(avgs.ix[i].name,without_port=True))
220 633f4d39 Luca Baldesi
                                        rtt = maxs.ix[i]['rtt']
221
                                        mdev = maxs.ix[i]['mdev']
222
                                        out_file.write("RTT_MAX,"+sliver+","+dst+","+str(rtt)+","+str(mdev)+"\n")
223 53ccb966 Luca
224
        out_file.close()
225
226
227 698b8685 Luca
def hopsAnalyse(folder,exp,begin_sec,end_sec):
228
        out_file = open(folder+"/packets_hops.exp","w")
229
        out_file.write("info_type,session_id,time,chunks,peer_hostname,hops_avg\n")
230
        delta = (end_sec-begin_sec)/100 # time resolution in seconds
231
        for session in exp.sessions:
232
                if (session.getSource() != None):
233
                        for peer in session.peers:
234
                                for instant in range(session.time+begin_sec,session.time+end_sec,delta):
235
                                        delta_hops = peer.published_interval_sec(instant,instant+delta)['hopcount']
236
                                        out_file.write("HOP,"+str(session.time)+","+str(instant+delta)+","+str(delta_hops.count())+","+\
237
                                                peer.hostname+","+str(delta_hops.mean())+"\n")
238
239
                else:
240
                        warning ("No source for session "+str(session.time))
241
242
        out_file.close()
243
244
def lossAnalyse(folder,exp,begin_sec,end_sec):
245
        out_file = open(folder+"/packets_loss.exp","w")
246
        out_file.write("info_type,session_id,chunks,peer_hostname,losts\n")
247
248
        for session in exp.sessions:
249
                sessionLossAnalyse(folder,session,begin_sec,end_sec)
250
                print "Session: "+str(session.time)
251
                if (session.getSource() != None):
252
                        source_ids = set(session.getSource().published_interval(\
253
                                        (session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)\
254
                                        ['chunkID'])
255
256
                        for peer in session.peers:
257
                                if not peer.is_source:
258
                                        peer_ids = set(peer.published['chunkID'])#_interval((session.time+begin_sec)*1000000,(session.time+end_sec)*1000000)['chunkID'])
259
                                        print "Peer "+peer.hostname+" lost "+\
260
                                                        str(len(source_ids.difference(peer_ids)))+\
261
                                                        "/"+str(len(source_ids))+" packets" 
262
                                        out_file.write("LOSS,"+str(session.time)+","+str(len(source_ids))+\
263
                                                        ","+purify_hostname(peer.hostname)+","+str(len(source_ids.difference(peer_ids)))+"\n")
264
                else:
265
                        print "[WARNING] source not found"
266
267
        out_file.close()
268
269 633f4d39 Luca Baldesi
270
def        dumpHostnameMap(folder,exp):
271
        hostnames = {}
272
        for session in exp.sessions:
273
                for peer in session.peers:
274
                        hostnames[peer.hostname] = peer.address
275
276
        out_file = open(folder+"/hostnames2address.exp","w")
277
        out_file.write("info_type,hostname,address\n")
278
        for key in hostnames.keys():
279
                out_file.write("MAPPING,"+key+","+hostnames[key]+"\n")
280
281
        out_file.close()
282
        
283
284 698b8685 Luca
def main(argv):
285
        try:
286
                opts,args = getopt.getopt(argv,"hf:",["help","folder"])
287
        except getopt.GetoptError:
288
                sys.exit(2)
289
        for opt,arg in opts:
290
                if opt in ("-h","--help"):
291
                        sys.exit()
292
                elif opt in ("-f","--folder"):
293
                        folder = arg
294
        
295
        try:
296
                folder
297
        except NameError:
298
                sys.exit()
299
        print "folder is " + folder
300
        exp = Experiment()
301 878b6653 Luca
        pm = pmgr.ProcessManager()
302 3294e8f4 Luca
        preproc_chunklog(folder)
303 05ddd673 Luca Baldesi
        preproc_csv(folder,procman=pm)
304 633f4d39 Luca Baldesi
        preproc_csv(folder)
305 05ddd673 Luca Baldesi
        pm.joinAll()
306 633f4d39 Luca Baldesi
#
307 698b8685 Luca
        dataPopulate(folder,exp)
308 633f4d39 Luca Baldesi
        pm.launchProcess(rttAnalyse,[folder,exp])
309 878b6653 Luca
        pm.launchProcess(lossAnalyse,[folder,exp,150,450])
310
        pm.launchProcess(delayAnalyse,[folder,exp,150,450])
311
        pm.launchProcess(receptionAnalyse,[folder,exp,150,450])
312
        pm.launchProcess(hopsAnalyse,[folder,exp,150,450])
313
        pm.joinAll()
314 633f4d39 Luca Baldesi
        dumpHostnameMap(folder,exp)
315 878b6653 Luca
316 698b8685 Luca
        for session in exp.sessions:
317
                print "Session: "+str(session.time)+ " has "+str(len(session.peers))+" peers"
318
319
if __name__ == "__main__":
320
        main(sys.argv[1:])