Statistics
| Branch: | Revision:

streamers / loop.c @ 9a1f5816

History | View | Annotate | Download (6.11 KB)

1
/*
2
 * Copyright (c) 2010-2011 Luca Abeni
3
 * Copyright (c) 2010-2011 Csaba Kiraly
4
 *
5
 * This file is part of PeerStreamer.
6
 *
7
 * PeerStreamer is free software: you can redistribute it and/or
8
 * modify it under the terms of the GNU Affero General Public License as
9
 * published by the Free Software Foundation, either version 3 of the
10
 * License, or (at your option) any later version.
11
 *
12
 * PeerStreamer is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
15
 * General Public License for more details.
16
 *
17
 * You should have received a copy of the GNU Affero General Public License
18
 * along with PeerStreamer.  If not, see <http://www.gnu.org/licenses/>.
19
 *
20
 */
21
#ifndef _WIN32
22
#include <sys/select.h>
23
#else
24
#include <winsock2.h>
25
#endif
26
#include <sys/time.h>
27
#include <time.h>
28
#include <stdint.h>
29
#include <stdlib.h>
30
#include <stdio.h>
31
#include <stdbool.h>
32
#include <string.h>
33

    
34
#include <net_helper.h>
35
#include <grapes_msg_types.h>
36
#include <peerset.h>
37
#include <peer.h>
38

    
39
#include "compatibility/timer.h"
40

    
41
#include "chunk_signaling.h"
42
#include "streaming.h"
43
#include "topology.h"
44
#include "loop.h"
45
#include "dbg.h"
46
#include "node_addr.h"
47

    
48
#define BUFFSIZE (512 * 1024)
49
#define FDSSIZE 16
50
struct timeval period = {0, 500000};
51

    
52
//calculate timeout based on tnext
53
void tout_init(struct timeval *tout, const struct timeval *tnext)
54
{
55
  struct timeval tnow;
56

    
57
  gettimeofday(&tnow, NULL);
58
  if(timercmp(&tnow, tnext, <)) {
59
    timersub(tnext, &tnow, tout);
60
  } else {
61
    *tout = (struct timeval){0, 0};
62
  }
63
}
64

    
65
void handle_msg(const struct nodeID* nodeid,bool source_role)
66
{
67
        uint8_t buff[BUFFSIZE];
68
        struct nodeID *remote;
69
        int len;
70

    
71
        len = recv_from_peer(nodeid, &remote, buff, BUFFSIZE);
72
        if (len < 0) {
73
                fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
74
        }else
75
                switch (buff[0] /* Message Type */) {
76
                        case MSG_TYPE_TMAN:
77
                        case MSG_TYPE_NEIGHBOURHOOD:
78
                        case MSG_TYPE_TOPOLOGY:
79
                                dtprintf("Topo message received:\n");
80
                                topology_message_parse(remote, buff, len);
81
                                break;
82
                        case MSG_TYPE_CHUNK:
83
                                dtprintf("Chunk message received:\n");
84
                                if(!source_role)
85
                                        received_chunk(remote, buff, len);
86
                                break;
87
                        case MSG_TYPE_SIGNALLING:
88
                                dtprintf("Sign message received:\n");
89
                                sigParseData(remote, buff, len);
90
                                break;
91
                        default:
92
                                fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
93
                }
94

    
95
        nodeid_free(remote);
96
}
97

    
98
void timeradd_inplace(struct timeval *base,struct timeval *toadd)
99
{
100
        struct timeval tmp;
101
        timeradd(base,toadd,&tmp);
102
        *base = tmp;
103
}
104

    
105
void usec2timeval(struct timeval *t, long usec)
106
{        
107
  t->tv_sec = usec / 1000000;
108
  t->tv_usec = usec % 1000000;
109
}
110

    
111
void loop_update(int loop_counter)
112
{
113
                if (loop_counter % 10 == 0)
114
                        topology_update();
115
                if (loop_counter % 100 == 0)
116
                        peerset_print(topology_get_neighbours(),"PEERSET");
117
#ifndef MONL
118
                if (loop_counter % 100 == 0)
119
                {
120
//                        fprintf(stderr,"MEASURES\n");
121
                        log_nodes_measures();
122
                }
123
#endif
124
}
125

    
126
void spawn_chunk(int chunk_copies,struct timeval *chunk_time_interval)
127
{
128
  struct chunk *new_chunk;
129

    
130
        new_chunk = generated_chunk(&(chunk_time_interval->tv_usec));
131
        usec2timeval(chunk_time_interval,chunk_time_interval->tv_usec);
132
        if (new_chunk && add_chunk(new_chunk))
133
        { 
134
                inject_chunk(new_chunk,chunk_copies);
135
                free(new_chunk); //if add_chunk fails it destroies the chunk
136
        }
137
}
138

    
139
void source_loop(const char *videofile, struct nodeID *nodeid, int csize, int chunk_copies, int buff_size)
140
/* source peer loop
141
 * @videofile: video input filename
142
 * @nodeid: local network identifier
143
 * @csize: chunks offer interval in microseconds
144
 * @chunk_copies: number of copies injected in the overlay
145
 * @buff_size: size of the chunk buffer
146
 */
147
{
148
        bool running=true;
149
        int data_ready,loop_counter=0;
150
        struct timeval awake_epoch, sleep_timer;
151
        struct timeval chunk_time_interval, offer_epoch, chunk_epoch, current_epoch; 
152
  int wait4fds[FDSSIZE],*pfds,fds[FDSSIZE] = {-1};
153

    
154
        usec2timeval(&period,csize);
155
        gettimeofday(&awake_epoch,NULL);
156
        timeradd(&awake_epoch,&period,&offer_epoch);
157
        chunk_epoch = awake_epoch;
158

    
159
  if (source_init(videofile, nodeid, fds, FDSSIZE, buff_size) < 0) {
160
    fprintf(stderr,"Cannot initialize source, exiting");
161
    exit(-1);
162
  }
163
        while(running)
164
        {
165
                if(fds[0] !=-1) {
166
                        memmove(wait4fds,fds,sizeof(fds)); // bug in copy size?
167
                        pfds = wait4fds;
168
                } else
169
                        pfds = NULL;
170

    
171
          tout_init(&sleep_timer, &awake_epoch);
172
                data_ready = wait4data(nodeid,&sleep_timer,pfds);
173

    
174
                switch(data_ready) {
175
                        case 0: // timeout, no socket has data to pick
176
                                gettimeofday(&current_epoch,NULL);
177
                                if(timercmp(&offer_epoch, &current_epoch, <)) // offer time !
178
                                {
179
                                        send_offer();
180
                            timeradd_inplace(&offer_epoch, &period);
181
                                }
182
                                else // chunk time ! <- needed for chunkisers withouth filedescritors, e.g., avf
183
                                {
184
                                        spawn_chunk(chunk_copies,&chunk_time_interval);
185
                                        timeradd_inplace(&chunk_epoch, &chunk_time_interval); 
186
                                }
187

    
188
                                if(fds[0] == -1 && timercmp(&chunk_epoch, &offer_epoch, <)) // if we need a chunk timer
189
                                        awake_epoch = chunk_epoch;
190
                                else
191
                                        awake_epoch = offer_epoch;
192
                                break;
193

    
194
                        case 1: //incoming msg
195
                                handle_msg(nodeid,true);
196
                                break;
197

    
198
                        case 2: //file descriptor ready
199
                                spawn_chunk(chunk_copies,&chunk_time_interval);
200
                                break;
201
                
202
                        default:
203
                                fprintf(stderr,"[ERROR] select on file descriptors returned error: %d\n",data_ready);
204
                }
205
                loop_update(loop_counter++);
206
        }
207
}
208

    
209
void loop(struct nodeID *nodeid,int csize,int buff_size)
210
/* generic peer loop
211
 * @nodeid: local network identifier
212
 * @csize: chunks offer interval in microseconds
213
 * @buff_size: size of the chunk buffer
214
 */
215
{
216
        bool running=true;
217
        int data_ready,loop_counter=0;
218
        struct timeval epoch, wait_timer; 
219
        usec2timeval(&period,csize);
220
        gettimeofday(&epoch,NULL);
221

    
222
         stream_init(buff_size, nodeid);
223
  topology_update();
224

    
225
        while(running)
226
        {
227
    tout_init(&wait_timer, &epoch);
228
    data_ready = wait4data(nodeid, &wait_timer, NULL);
229
                if(data_ready == 1)
230
                        // we have been interrupted for an incoming msg
231
                        handle_msg(nodeid,false);
232
                else
233
                        // perform loop tasks
234
                {
235
                        send_offer();
236
            timeradd_inplace(&epoch, &period);
237
                }
238
                loop_update(loop_counter++);        
239
        }
240
}
241