Revision 89e893e2

View differences:

Makefile
1
CFLAGS = -Wall
2
CFLAGS += -Wdeclaration-after-statement
3
CFLAGS += -Wno-switch -Wpointer-arith -Wredundant-decls
4
CFLAGS += -Wno-pointer-sign 
5
CFLAGS += -g
6

  
7
CPPFLAGS = -I$(GRAPES)/include
8
CPPFLAGS += -I$(GRAPES)/som
9

  
10
LDFLAGS = -L$(GRAPES)/som/TopologyManager -L$(GRAPES)/som/ChunkTrading -L$(GRAPES)/som/ChunkBuffer
11
LDLIBS = -ltrading -lcb -ltopman
12

  
13
OBJS = dumbstreamer.o loop.o streaming.o output.o input.o net_helpers.o
14

  
15
all: dumbstreamer
16

  
17
dumbstreamer: $(OBJS) $(GRAPES)/som/net_helper.o
18

  
19
clean:
20
	rm -f dumbstreamer
21
	rm -f *.o
dumbstreamer.c
1
/*
2
 *  Copyright (c) 2009 Luca Abeni
3
 *
4
 *  This is free software; see GPL.txt
5
 */
6
#include <stdlib.h>
7
#include <stdint.h>
8
#include <stdio.h>
9
#include <string.h>
10
#include <getopt.h>
11

  
12
#include <net_helper.h>
13
#include <topmanager.h>
14

  
15
#include "net_helpers.h"
16
#include "loop.h"
17

  
18
static const char *my_addr = "127.0.0.1";
19
static int port = 6666;
20
static int srv_port;
21
static const char *srv_ip;
22
static int period = 500;
23
static int chunks_per_second = 4;
24

  
25
static void cmdline_parse(int argc, char *argv[])
26
{
27
  int o;
28

  
29
  while ((o = getopt(argc, argv, "p:i:P:I:")) != -1) {
30
    switch(o) {
31
      case 'p':
32
        srv_port = atoi(optarg);
33
        break;
34
      case 'i':
35
        srv_ip = strdup(optarg);
36
        break;
37
      case 'P':
38
        port =  atoi(optarg);
39
        break;
40
      case 'I':
41
        my_addr = iface_addr(optarg);
42
        break;
43
      default:
44
        fprintf(stderr, "Error: unknown option %c\n", o);
45

  
46
        exit(-1);
47
    }
48
  }
49
}
50

  
51
static struct nodeID *init(void)
52
{
53
  struct nodeID *myID;
54

  
55
  myID = create_socket(my_addr, port);
56
  if (myID == NULL) {
57
    fprintf(stderr, "Error creating my socket (%s:%d)!\n", my_addr, port);
58
  }
59
  topInit(myID);
60

  
61
  return myID;
62
}
63

  
64

  
65
int main(int argc, char *argv[])
66
{
67
  struct nodeID *my_sock;
68

  
69
  cmdline_parse(argc, argv);
70

  
71
  my_sock = init();
72
  if (srv_port != 0) {
73
    struct nodeID *srv;
74

  
75
    srv = create_socket(srv_ip, srv_port);
76
    topAddNeighbour(srv);
77

  
78
    loop(my_sock, 1000000 / chunks_per_second);
79
  }
80

  
81
  source_loop(my_sock, period * 1000, chunks_per_second * period / 1000);
82

  
83
  return 0;
84
}
input.c
1
#include <stdint.h>
2
#include <stdio.h>
3
#include <string.h>
4

  
5
#include <chunk.h>
6

  
7
#include "input.h"
8

  
9
void input_get(struct chunk *c)
10
{
11
  char buff[64];
12
  static int id;
13

  
14
  sprintf(buff, "Chunk %d", id);
15
  c->id = id;
16
  c->timestamp = 40 * id++;
17
  c->data = strdup(buff);
18
  c->size = strlen(c->data) + 1;
19
  c->attributes_size = 0;
20
  c->attributes = NULL;
21
}
input.h
1
void input_get(struct chunk *c);
loop.c
1
#include <sys/select.h>
2
#include <sys/time.h>
3
#include <time.h>
4
#include <stdint.h>
5
#include <stdlib.h>
6
#include <stdio.h>
7

  
8
#include <net_helper.h>
9
#include <topmanager.h>
10

  
11
#include "streaming.h"
12
#include "loop.h"
13

  
14
static struct timeval period = {0, 500000};
15
static struct timeval tnext;
16

  
17
void tout_init(struct timeval *tv)
18
{
19
  struct timeval tnow;
20

  
21
  if (tnext.tv_sec == 0) {
22
    gettimeofday(&tnext, NULL);
23
  }
24
  gettimeofday(&tnow, NULL);
25
  if(timercmp(&tnow, &tnext, <)) {
26
    timersub(&tnext, &tnow, tv);
27
  } else {
28
    *tv = (struct timeval){0, 0};
29
  }
30
}
31

  
32
static int wait4data(struct nodeID *s)
33
{
34
  fd_set fds;
35
  int res;
36
  struct timeval tv;
37
  int fd = getFD(s);
38

  
39
  FD_ZERO(&fds);
40
  FD_SET(fd, &fds);
41
  tout_init(&tv);
42
  res = select(fd + 1, &fds, NULL, NULL, &tv);
43
  if (FD_ISSET(fd, &fds)) {
44
    return fd;
45
  }
46

  
47
  return -1;
48
}
49

  
50
void loop(struct nodeID *s, int csize)
51
{
52
  int done = 0;
53
#define BUFFSIZE 1024
54
  static uint8_t buff[BUFFSIZE];
55
  int cnt = 0;
56
  
57
  period.tv_sec = csize / 1000000;
58
  period.tv_usec = csize % 1000000;
59
  
60
  topParseData(NULL, 0);
61
  stream_init(8);	// FIXME!
62
  while (!done) {
63
    int len;
64
    int fd;
65

  
66
    fd = wait4data(s);
67
    if (fd > 0) {
68
      struct nodeID *remote;
69

  
70
      len = recv_data(s, &remote, buff, BUFFSIZE);
71
      switch (buff[0] /* Message Type */) {
72
        case 0x10 /* NCAST_PROTO */:
73
          topParseData(buff, len);
74
          break;
75
        case 12:
76
          received_chunk(buff, len);
77
        default:
78
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
79
      }
80
      free(remote);
81
    } else {
82
      const struct nodeID **neighbours;
83
      int n;
84
      struct timeval tmp;
85

  
86
      neighbours = topGetNeighbourhood(&n);
87
      send_chunk(neighbours, n);
88
      if (cnt++ % 10 == 0) {
89
        topParseData(NULL, 0);
90
      }
91
      timeradd(&tnext, &period, &tmp);
92
      tnext = tmp;
93
    }
94
  }
95
}
96

  
97
void source_loop(struct nodeID *s, int csize, int chunks)
98
{
99
  int done = 0;
100
#define BUFFSIZE 1024
101
  static uint8_t buff[BUFFSIZE];
102
  int cnt = 0;
103

  
104
  period.tv_sec = csize  / 1000000;
105
  period.tv_usec = csize % 1000000;
106
  
107
  stream_init(1);
108
  while (!done) {
109
    int len;
110
    int fd;
111

  
112
    fd = wait4data(s);
113
    if (fd > 0) {
114
      struct nodeID *remote;
115

  
116
      len = recv_data(s, &remote, buff, BUFFSIZE);
117
      switch (buff[0] /* Message Type */) {
118
        case 0x10 /* NCAST_PROTO */:
119
          fprintf(stderr, "Top Parse\n");
120
          topParseData(buff, len);
121
          break;
122
        default:
123
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
124
      }
125
      free(remote);
126
    } else {
127
      const struct nodeID **neighbours;
128
      int i, n;
129
      struct timeval tmp;
130

  
131
      fprintf(stderr, "Generate Chunk\n");
132
      generated_chunk();
133
      neighbours = topGetNeighbourhood(&n);
134
      for (i = 0; i < chunks; i++) {
135
        fprintf(stderr, "Send Chunk\n");
136
        send_chunk(neighbours, n);
137
      }
138
      if (cnt++ % 10 == 0) {
139
        topParseData(NULL, 0);
140
      }
141
      timeradd(&tnext, &period, &tmp);
142
      tnext = tmp;
143
    }
144
  }
145
}
loop.h
1
void loop(struct nodeID *s, int period);
2
void source_loop(struct nodeID *s, int csize, int chunks);
3

  
net_helpers.c
1
#include <sys/ioctl.h>
2
#include <sys/socket.h>
3
#include <netinet/in.h>
4
#include <arpa/inet.h>
5
#include <net/if.h>     /* For struct ifreq */
6
#include <unistd.h>
7
#include <stdlib.h>
8
#include <stdio.h>
9
#include <string.h>
10

  
11
#include "net_helpers.h"
12

  
13
const char *iface_addr(const char *iface)
14
{
15
    int s, res;
16
    struct ifreq iface_request;
17
    struct sockaddr_in *sin;
18
    char buff[512];
19

  
20
    s = socket(AF_INET, SOCK_DGRAM, 0);
21
    if (s < 0) {
22
        return NULL;
23
    }
24

  
25
    memset(&iface_request, 0, sizeof(struct ifreq));
26
    sin = (struct sockaddr_in *)&iface_request.ifr_addr;
27
    strcpy(iface_request.ifr_name, iface);
28
    /* sin->sin_family = AF_INET); */
29
    res = ioctl(s, SIOCGIFADDR, &iface_request);
30
    if (res < 0) {
31
        perror("ioctl(SIOCGIFADDR)");
32
        close(s);
33

  
34
        return NULL;
35
    }
36
    close(s);
37

  
38
    inet_ntop(AF_INET, &sin->sin_addr, buff, sizeof(buff));
39

  
40
    return strdup(buff);
41
}
42

  
43

  
44

  
net_helpers.h
1
const char *iface_addr(const char *iface);
2

  
output.c
1
#include <stdint.h>
2

  
3
#include <chunk.h>
4

  
5
void output_deliver(const struct chunk *c)
6
{
7
}
output.h
1
void output_deliver(const struct chunk *c);
streaming.c
1
#include <stdlib.h>
2
#include <stdio.h>
3
#include <stdint.h>
4

  
5
#include <net_helper.h>
6
#include <chunk.h> 
7
#include <chunkbuffer.h> 
8
#include <trade_msg_la.h>
9
#include <trade_msg_ha.h>
10

  
11
#include "streaming.h"
12
#include "output.h"
13
#include "input.h"
14

  
15
static struct chunk_buffer *cb;
16

  
17
void stream_init(int size)
18
{
19
  char conf[32];
20

  
21
  sprintf(conf, "size=%d", size);
22
  cb = cb_init(conf);
23
}
24

  
25
void received_chunk(const uint8_t *buff, int len)
26
{
27
  int res;
28
  struct chunk c;
29

  
30
  res = decodeChunk(&c, buff + 1, len - 1);
31
  if (res > 0) {
32
    output_deliver(&c);
33
    res = cb_add_chunk(cb, &c);
34
    if (res < 0) {
35
      free(c.data);
36
      free(c.attributes);
37
    }
38
  }
39
}
40

  
41
void generated_chunk(void)
42
{
43
  int res;
44
  struct chunk c;
45

  
46
  input_get(&c);
47
  if (res > 0) {
48
    res = cb_add_chunk(cb, &c);
49
    if (res < 0) {
50
      free(c.data);
51
      free(c.attributes);
52
    }
53
  }
54
}
55

  
56
void send_chunk(const struct nodeID **neighbours, int n)
57
{
58
  struct chunk *buff;
59
  int target, c, size;
60

  
61
  if (n == 0) return;
62
  buff = cb_get_chunks(cb, &size);
63
  if (size == 0) return;
64

  
65
  /************ STUPID DUMB SCHEDULING ****************/
66
  target = n * (rand() / (RAND_MAX + 1.0)); /*0..n-1*/
67
  c = size * (rand() / (RAND_MAX + 1.0)); /*0..size-1*/
68
  /************ /STUPID DUMB SCHEDULING ****************/
69

  
70
  sendChunk(neighbours[target], buff + c);
71
}
streaming.h
1
void stream_init(int size);
2
void received_chunk(const uint8_t *buff, int len);
3
void send_chunk(const struct nodeID **neighbours, int n);
4
void generated_chunk(void);

Also available in: Unified diff