Statistics
| Branch: | Revision:

napa-baselibs / tests / peer / nvtest1.c @ 507372bb

History | View | Annotate | Download (9.95 KB)

1
/*
2
 */
3
#include "napa.h"
4
#include "napa_log.h"
5
#include "chunk.h"
6
#include "ml.h"
7

    
8
#define URLPARTLEN 50
9
#define HTTP_PREFIX "http://"
10
#define DEFAULT_CHUNK_PATH "/NapaTest/chunk"
11

    
12

    
13
void  http_close_cb(struct evhttp_connection *conn, void *arg) {
14
      ev_uint16_t port;
15
      char* namebuf;
16
      evhttp_connection_get_peer(conn, &namebuf, &port);
17
fprintf(stderr, "HTTP Request complete: %s : %d\n", namebuf, port);
18

    
19
}
20

    
21
void http_request_cb(struct evhttp_request *req, void *arg) {
22
      struct evbuffer *inb = evhttp_request_get_input_buffer(req);
23
      fprintf(stderr, "HTTP Request (%s) callback %d, %s\n",
24
                      req->kind == EVHTTP_REQUEST ? "REQ" : "RESP",
25
                      req->response_code, req->response_code_line);
26
      char *cc;
27
      while((cc = evbuffer_readln(inb, NULL, EVBUFFER_EOL_ANY)) != NULL) {
28
        printf("Line X: %s!\n",cc);
29
      }
30

    
31
//          evbuffer_free(req->output_buffer);
32
//      evhttp_request_free(req);
33
}
34

    
35
void chunk_delay_callback(struct chunk *chunk, void *arg) {
36
        struct evhttp_connection *htc = (struct evhttp_connection *)arg;
37
    struct evhttp_request *reqobj = evhttp_request_new (http_request_cb, "VAVAVAV");
38
        char uri[50], nbuf[10];
39

    
40
        evbuffer_add(reqobj->output_buffer,chunk->buf, chunk->len);
41

    
42
    reqobj->kind = EVHTTP_REQUEST;
43
          {
44
                        char nbuf[50];
45
                        char *p;
46
                        uint16_t port;
47
                        evhttp_connection_get_peer(htc,&p,&port);
48
                        sprintf(nbuf,"%s:%d",p, port);
49

    
50
                        evhttp_add_header(reqobj->output_headers,"Host",nbuf);
51
                        printf("Chunk transmission: %d %d %s\n", chunk->len, chunk->chunk_num, nbuf);
52
          }
53
        sprintf(nbuf,"%d", chunk->len);
54
        evhttp_add_header(reqobj->output_headers,"Content-Length",nbuf);
55
        sprintf(uri,"%s/%d", DEFAULT_CHUNK_PATH, chunk->chunk_num);
56

    
57
    evhttp_make_request(htc, reqobj, EVHTTP_REQ_POST, DEFAULT_CHUNK_PATH);
58
}
59

    
60
/**
61
        * break up uri string to hostname, port and path-prefix 
62
        * valid formats is [[<address>]:]<port>[/[<path-prefix>]]
63
        * optional address and path-prefix fields, if not detected, are not assigned a value
64
        * output parameters may be NULL, meaning no assignment requested
65
        *
66
        * @param url the original string
67
        * @param host a buffer to receive the host part (URIPARTLEN bytes minimum). NULL means ignore.
68
        * @param port this will receive the port part. NULL means ignore.
69
        * @param path a buffer to receive the path part (URIPARTLEN bytes minimum). NULL means ignore.
70
        * @return the number of fields detected: 1: port only 2: hostname + port, 3 all fields.
71
*/
72

    
73

    
74
int     breakup_url(const char *url, char *host, int *port, char *path) {
75
                        char hostpart[URLPARTLEN + 1], pathpart[URLPARTLEN + 1];
76
                        int portpart = -1;
77

    
78
//                        int fields = sscanf(url,"%"  #URLPARTLEN "[^:]:%d/%" #URLPARTLEN "s", hostpart, &portpart, pathpart);
79
                        int fields = sscanf(url,"%50[^:]:%d/%50s", hostpart, &portpart, pathpart);
80
                        if(fields < 2) {
81
                                fields = sscanf(url + (*url == ':'),"%d",&portpart);
82
                        }
83
                        if( portpart <= 0 || portpart > 65535) return 0; 
84
                        switch(fields) {
85
                                        break;
86
                                case 3:
87
                                        if(path != NULL) { *path = '/'; strcpy(path+1, pathpart); }
88
                                case 2:
89
                                        if(host != NULL) strcpy(host, hostpart);
90
                                case 1:
91
                                        if(port != NULL) *port = portpart;
92
                                        break;
93
                                default: return 0;
94
                        }
95
                        return fields;
96
}
97

    
98

    
99
/**
100
* Setup a HTTP ejector, a component that will transmit a delayed replay of chunks via HTTP post 
101
*
102
 * @param url   the <hostname>:<port>[/<pathprefix>] format URL. (<pathprefix> is currently unused)
103
 * @param delay requested delay in usec relative to the original timestamps.
104
 * @return -1 if error (i.e. params failed), 0 otherwise
105
*/
106

    
107
int        setup_http_ejector(const char const *url, int delay) {
108
                char host[URLPARTLEN+1]; 
109
                int port; 
110
                if(breakup_url(url, host, &port, NULL) < 2) return -1;
111

    
112
                info("Registering HTTP ejector toward %s:%d  (delay: %d)", host, port, delay);
113
        struct evhttp_connection *htc = evhttp_connection_base_new(eventbase, host, port);
114
        evhttp_connection_set_closecb(htc, http_close_cb, "DUMMY");
115
                chbuf_add_listener(delay, chunk_delay_callback, htc);
116
                return 0;
117
}
118

    
119

    
120
/**
121
 * Callback for http injector
122
 *
123
 * @param req contains info about the request
124
 * @param contains the path prefix
125
 */
126

    
127
static void http_receive(struct evhttp_request *req, void *dummy_arg) {
128
                char *path_pat = (char *)dummy_arg;
129
        char *path = req->uri;
130
fprintf(stdout, "Http request received: %s %d\n", req->uri, req->type);
131
        if(!strncmp(path, HTTP_PREFIX , strlen(HTTP_PREFIX))) {  // skip "http://host:port" part if present
132
                          path = strchr(path + strlen(HTTP_PREFIX),'/');
133
        }
134
        if(req->type == EVHTTP_REQ_POST || req->type == EVHTTP_REQ_PUT) {
135

    
136
            if(!strncmp(path, path_pat , strlen(path_pat))) {
137
                     int chunk_num;
138
                     int slashed;
139
                     path += strlen(path_pat);
140
                     if(slashed = (*path == '/'))  path++;
141
                     if(*path == '\0') {
142
                         chunk_num = chbuf_get_current() + 1;
143
                     }
144
                     else {
145
                                                 int p;
146
                                                 if(sscanf(path, "%u%n",&chunk_num, &p) != 1) { 
147
bad_url:
148
                                 evhttp_send_reply(req,400,"BAD URI",NULL);
149
                                                                 fprintf(stdout,"Bad URL extension in [%s]. Required format: %s/<nnn>.\n", req->uri, path_pat);
150

    
151
                                 return;
152
                         }
153
                                                 if(p < strlen(path)) {
154
                                                         if(path[p] == '?') {
155
        // TODO: process parameters
156
                                                         }
157
                                                         else goto bad_url;
158
                                                 }
159
                     }
160
                                         fprintf(stdout, "Contains data: %d bytes\n", evbuffer_get_length(req->input_buffer));
161

    
162
                     chbuf_enqueue_from_evbuf(req->input_buffer, chunk_num, NULL);
163
                     evhttp_send_reply(req,200,"OK",NULL);
164
                                         return;
165
                }
166
        }
167
                fprintf(stdout,"Unknown HTTP request");
168
        evhttp_send_reply(req,404,"NOT FOUND",NULL);
169
}
170

    
171

    
172
/**
173
 *  Setup a HTTP injector, a component that will transmit a delayed replay of chunks via HTTP post 
174
 *  @todo TODO: return error if fisten failed
175
 * @param url   in <hostname>:<port>/<pathprefix> or <hostname>:<port> or <port> format (default for hostname is 0.0.0.0 and default for pathprefix is "/NapaTest/chunk"
176
 * @return -1 if error (url format failed) 0 otherwise
177

178
*/
179

    
180
int        setup_http_injector(const char const *url) {
181
                char host[URLPARTLEN+1] = "0.0.0.0";
182
                char *path = malloc(URLPARTLEN+1);
183
                int port; 
184

    
185
                sprintf(path, DEFAULT_CHUNK_PATH); 
186
                if(breakup_url(url, host, &port, path) < 1) return -1;
187

    
188
                printf("Starting HTTP injector listening on %s:%d for uri prefix is %s\n", host, port, path);
189

    
190
                struct evhttp* evh = evhttp_new(eventbase);
191
                evhttp_bind_socket(evh,host,port);
192
                evhttp_set_gencb(evh, http_receive, path);
193
                return 0;
194
}
195

    
196

    
197
/** ********************** MAIN ************************ */
198

    
199

    
200
void receive_local_socketID_cb(socketID_handle local_socketID,int errorstatus){
201

    
202
  char buf[100];
203
  mlSocketIDToString(local_socketID,buf, 100);
204

    
205
    printf("--Monitoring module: received a local ID <%s>\n", buf);
206

    
207
}
208

    
209
static connID;
210
#define CHUNK_MSG 15
211

    
212
void chunk_transmit_callback(struct chunk *chunk, void *arg) {
213
        send_params sParams;
214
        char headerbuf[100];
215
        send_all_data_container buflist;
216
        debug("Sending chunk with data of length: %d", chunk->len);
217
        sprintf(headerbuf, "CHUNK: CHUNK_NUM=%d LEN=%d LEN=%ld.%ld\n\n", chunk->chunk_num, chunk->len, chunk->rectime.tv_sec, chunk->rectime.tv_usec);
218
        buflist.buffer_1 = headerbuf;
219
        buflist.length_1 = strlen(headerbuf);
220
        buflist.buffer_2 = (char *)chunk->buf;
221
        buflist.length_2 = chunk->len;
222
        mlSendAllData((int)arg, &buflist, 2,CHUNK_MSG,&sParams);
223
}
224

    
225
/**
226
  * A callback function that tells a connection has been established. 
227
  * @param connectionID The connection ID
228
  * @param *arg An argument for data about the connection 
229
  */
230
void receive_outconn_cb (int connectionID, void *arg){
231

    
232
      info("ML: Successfully set up outgoing connection connectionID %d  \n",connectionID );
233

    
234
#if 0 
235
      /// transmit data to peer 2  
236
      int size = 10000;
237
      char buffer[size];
238
      memset(buffer,'a',size);
239
      unsigned char msgtype = 12;
240
      send_params sParams;
241
 
242
      mlSendData(connID,buffer,size,msgtype,&sParams);
243
#endif
244

    
245
        chbuf_add_listener(0, chunk_transmit_callback, (void *)connID);
246
}
247

    
248

    
249
/**
250
  * A callback function that tells a connection has been established. 
251
  * @param connectionID The connection ID
252
  * @param *arg An argument for data about the connection 
253
  */
254
void receive_conn_cb (int connectionID, void *arg){
255

    
256
  printf("ML: Received incoming connection connectionID %d  \n",connectionID );
257
  connID=connectionID;
258
} 
259

    
260
/**
261
  * A funtion that prints a connection establishment has failed
262
  * @param connectionID The connection ID
263
  * @param *arg An argument for data about the connection
264
  */
265
void conn_fail_cb(int connectionID,void *arg){
266

    
267
  printf("--echoServer: ConnectionID %d  could not be established \n ",connectionID);
268

    
269
}
270

    
271
/**
272
  * The peer receives data per callback from the messaging layer. 
273
  * @param *buffer A pointer to the buffer
274
  * @param buflen The length of the buffer
275
  * @param msgtype The message type 
276
  * @param *arg An argument that receives metadata about the received data
277
  */
278
void recv_data_from_peer_cb(char *buffer,int buflen,unsigned char msgtype,void *arg){
279
  info("CLIENT_MODE: Received data from someone:  msgtype %d buflen %d",msgtype,buflen ); 
280
}
281

    
282
void recv_chunk_from_peer_cb(char *buffer,int buflen,unsigned char msgtype,void *arg){
283
  debug("Received chunk from callback: msgtype %d buflen %d",msgtype,buflen ); 
284
  int chunk_num, chunk_length;
285
  struct timeval timestamp;
286
  char c1;
287
  int headlen;
288
  if(sscanf(buffer, "CHUNK: CHUNK_NUM=%d LEN=%d LEN=%ld.%ld%*1[\n]%c%n", 
289
          &chunk_num, &chunk_length, &(timestamp.tv_sec), &(timestamp.tv_usec), &c1, &headlen) != 5 || c1 != '\n') {
290
          warn("WARN!!! received inconsistent chunk with bad header format: %30.30s\n",buffer); 
291
          return;
292
  }
293
  else if(buflen != headlen + chunk_length ) {
294
           warn("WARN!!! received inconsistent chunk with bad length: %d + %d <--> %d \n", headlen, chunk_length, buflen ); 
295
          return;
296
  }
297
  else {
298
      chbuf_enqueue_chunk(chunk_num, chunk_length, &timestamp, buffer+headlen);
299
  }
300
}
301