Statistics
| Branch: | Revision:

streamers / loop.c @ 9d6a53c3

History | View | Annotate | Download (3.54 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14

    
15
#include <net_helper.h>
16
#include <topmanager.h>
17
#include <msg_types.h>
18

    
19
#include "streaming.h"
20
#include "loop.h"
21
#include "dbg.h"
22

    
23
#define BUFFSIZE 512 * 1024
24
static struct timeval period = {0, 500000};
25
static struct timeval tnext;
26

    
27
void tout_init(struct timeval *tv)
28
{
29
  struct timeval tnow;
30

    
31
  if (tnext.tv_sec == 0) {
32
    gettimeofday(&tnext, NULL);
33
  }
34
  gettimeofday(&tnow, NULL);
35
  if(timercmp(&tnow, &tnext, <)) {
36
    timersub(&tnext, &tnow, tv);
37
  } else {
38
    *tv = (struct timeval){0, 0};
39
  }
40
}
41

    
42
void loop(struct nodeID *s, int csize, int buff_size)
43
{
44
  int done = 0;
45
  static uint8_t buff[BUFFSIZE];
46
  int cnt = 0;
47
  
48
  period.tv_sec = csize / 1000000;
49
  period.tv_usec = csize % 1000000;
50
  
51
  topParseData(NULL, 0);
52
  stream_init(buff_size, s);
53
  while (!done) {
54
    int len, res;
55
    struct timeval tv;
56

    
57
    tout_init(&tv);
58
    res = wait4data(s, &tv);
59
    if (res > 0) {
60
      struct nodeID *remote;
61

    
62
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
63
      if (len < 0) {
64
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
65
        nodeid_free(remote);
66
        continue;
67
      }
68
      switch (buff[0] /* Message Type */) {
69
        case MSG_TYPE_TOPOLOGY:
70
          dprintf("Topo message received\n");
71
          topParseData(buff, len);
72
          break;
73
        case MSG_TYPE_CHUNK:
74
          dprintf("Chunk message received:\n");
75
          received_chunk(buff, len);
76
          break;
77
        default:
78
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
79
      }
80
      nodeid_free(remote);
81
    } else {
82
      const struct nodeID **neighbours;
83
      int n;
84
      struct timeval tmp;
85

    
86
      neighbours = topGetNeighbourhood(&n);
87
      send_chunk(neighbours, n);
88
      if (cnt++ % 10 == 0) {
89
        topParseData(NULL, 0);
90
      }
91
      timeradd(&tnext, &period, &tmp);
92
      tnext = tmp;
93
    }
94
  }
95
}
96

    
97
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
98
{
99
  int done = 0;
100
  static uint8_t buff[BUFFSIZE];
101
  int cnt = 0;
102

    
103
  source_init(fname, s, loop);
104
  while (!done) {
105
    int len, res;
106
    struct timeval tv;
107

    
108
    tout_init(&tv);
109
    res = wait4data(s, &tv);
110
    if (res > 0) {
111
      struct nodeID *remote;
112

    
113
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
114
      if (len < 0) {
115
        fprintf(stderr,"Error receiving message. Maybe larger than %d bytes\n", BUFFSIZE);
116
        nodeid_free(remote);
117
        continue;
118
      }
119
      switch (buff[0] /* Message Type */) {
120
        case MSG_TYPE_TOPOLOGY:
121
          fprintf(stderr, "Top Parse\n");
122
          topParseData(buff, len);
123
          break;
124
        case MSG_TYPE_CHUNK:
125
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
126
          break;
127
        default:
128
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
129
      }
130
      nodeid_free(remote);
131
    } else {
132
      const struct nodeID **neighbours;
133
      int i, n, res;
134
      struct timeval tmp, d;
135

    
136
      d.tv_sec = 0;
137
      res = generated_chunk(&d.tv_usec);
138
      if (res) {
139
        neighbours = topGetNeighbourhood(&n);
140
        for (i = 0; i < chunks; i++) {
141
dprintf(" T: %lld\n", tnext.tv_sec * 1000 + tnext.tv_usec / 1000);
142
          send_chunk(neighbours, n);
143
        }
144
        if (cnt++ % 10 == 0) {
145
          topParseData(NULL, 0);
146
        }
147
      }
148
      timeradd(&tnext, &d, &tmp);
149
      tnext = tmp;
150
    }
151
  }
152
}