Statistics
| Branch: | Revision:

streamers / loop.c @ 46c24e94

History | View | Annotate | Download (3.12 KB)

1
/*
2
 *  Copyright (c) 2010 Luca Abeni
3
 *  Copyright (c) 2010 Csaba Kiraly
4
 *
5
 *  This is free software; see gpl-3.0.txt
6
 */
7
#include <sys/select.h>
8
#include <sys/time.h>
9
#include <time.h>
10
#include <stdint.h>
11
#include <stdlib.h>
12
#include <stdio.h>
13
#include <stdbool.h>
14

    
15
#include <net_helper.h>
16
#include <topmanager.h>
17
#include <msg_types.h>
18

    
19
#include "streaming.h"
20
#include "loop.h"
21
#include "dbg.h"
22

    
23
#define BUFFSIZE 64 * 1024
24
static struct timeval period = {0, 500000};
25
static struct timeval tnext;
26

    
27
void tout_init(struct timeval *tv)
28
{
29
  struct timeval tnow;
30

    
31
  if (tnext.tv_sec == 0) {
32
    gettimeofday(&tnext, NULL);
33
  }
34
  gettimeofday(&tnow, NULL);
35
  if(timercmp(&tnow, &tnext, <)) {
36
    timersub(&tnext, &tnow, tv);
37
  } else {
38
    *tv = (struct timeval){0, 0};
39
  }
40
}
41

    
42
void loop(struct nodeID *s, int csize, int buff_size)
43
{
44
  int done = 0;
45
  static uint8_t buff[BUFFSIZE];
46
  int cnt = 0;
47
  
48
  period.tv_sec = csize / 1000000;
49
  period.tv_usec = csize % 1000000;
50
  
51
  topParseData(NULL, 0);
52
  stream_init(buff_size, s);
53
  while (!done) {
54
    int len, res;
55
    struct timeval tv;
56

    
57
    tout_init(&tv);
58
    res = wait4data(s, tv);
59
    if (res > 0) {
60
      struct nodeID *remote;
61

    
62
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
63
      switch (buff[0] /* Message Type */) {
64
        case MSG_TYPE_TOPOLOGY:
65
          topParseData(buff, len);
66
          break;
67
        case MSG_TYPE_CHUNK:
68
          received_chunk(buff, len);
69
          break;
70
        default:
71
          fprintf(stderr, "Unknown Message Type %x\n", buff[0]);
72
      }
73
      nodeid_free(remote);
74
    } else {
75
      const struct nodeID **neighbours;
76
      int n;
77
      struct timeval tmp;
78

    
79
      neighbours = topGetNeighbourhood(&n);
80
      send_chunk(neighbours, n);
81
      if (cnt++ % 10 == 0) {
82
        topParseData(NULL, 0);
83
      }
84
      timeradd(&tnext, &period, &tmp);
85
      tnext = tmp;
86
    }
87
  }
88
}
89

    
90
void source_loop(const char *fname, struct nodeID *s, int csize, int chunks, bool loop)
91
{
92
  int done = 0;
93
  static uint8_t buff[BUFFSIZE];
94
  int cnt = 0;
95

    
96
  source_init(fname, s, loop);
97
  while (!done) {
98
    int len, res;
99
    struct timeval tv;
100

    
101
    tout_init(&tv);
102
    res = wait4data(s, tv);
103
    if (res > 0) {
104
      struct nodeID *remote;
105

    
106
      len = recv_from_peer(s, &remote, buff, BUFFSIZE);
107
      switch (buff[0] /* Message Type */) {
108
        case MSG_TYPE_TOPOLOGY:
109
          fprintf(stderr, "Top Parse\n");
110
          topParseData(buff, len);
111
          break;
112
        case MSG_TYPE_CHUNK:
113
          fprintf(stderr, "Some dumb peer pushed a chunk to me!\n");
114
          break;
115
        default:
116
          fprintf(stderr, "Bad Message Type %x\n", buff[0]);
117
      }
118
      nodeid_free(remote);
119
    } else {
120
      const struct nodeID **neighbours;
121
      int i, n, res;
122
      struct timeval tmp, d;
123

    
124
      d.tv_sec = 0;
125
      res = generated_chunk(&d.tv_usec);
126
      if (res) {
127
        neighbours = topGetNeighbourhood(&n);
128
        for (i = 0; i < chunks; i++) {
129
dprintf(" T: %lld\n", tnext.tv_sec * 1000 + tnext.tv_usec / 1000);
130
          send_chunk(neighbours, n);
131
        }
132
        if (cnt++ % 10 == 0) {
133
          topParseData(NULL, 0);
134
        }
135
      }
136
      timeradd(&tnext, &d, &tmp);
137
      tnext = tmp;
138
    }
139
  }
140
}