Statistics
| Branch: | Revision:

grapes / src / Cache / cloudcast_proto.c @ 567c8490

History | View | Annotate | Download (5.08 KB)

1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <stdint.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10

    
11
#include "net_helper.h"
12
#include "topocache.h"
13
#include "proto.h"
14
#include "topo_proto.h"
15
#include "cloudcast_proto.h"
16
#include "grapes_msg_types.h"
17

    
18
// TODO: This should not be duplicated here. should inherit from topo_proto.c
19
#define MAX_MSG_SIZE 1500
20

    
21
#define CLOUD_VIEW_KEY "view"
22

    
23
struct topo_header cloud_header = {MSG_TYPE_TOPOLOGY, CLOUDCAST_CLOUD};
24
struct cloudcast_proto_context {
25
  struct peer_cache* myEntry;
26
  uint8_t def_cloudcache[128];
27
  int def_cloudcache_len;
28
  struct topo_context *topo_context;
29
  struct cloud_helper_context *cloud_context;
30
};
31

    
32
int cloudcast_payload_fill(struct cloudcast_proto_context *context, uint8_t *payload, int size, struct peer_cache *c, int max_peers, int include_me);
33

    
34
struct cloudcast_proto_context* cloudcast_proto_init(struct nodeID *s, const void *meta, int meta_size)
35
{
36
  struct cloudcast_proto_context *con;
37
  struct peer_cache *tmp;
38
  con = malloc(sizeof(struct cloudcast_proto_context));
39

    
40
  if (!con) {
41
    fprintf(stderr, "cloudcast_proto: Error initializing context. ENOMEM\n");
42
    return NULL;
43
  }
44

    
45
  con->topo_context = topo_proto_init(s, meta, meta_size);
46
  if (!con->topo_context){
47
    fprintf(stderr, "cloudcast_proto: Error initializing topo_proto.\n");
48
    free(con);
49
    return NULL;
50
  }
51

    
52
  con->cloud_context = get_cloud_helper_for(s);
53
  if (!con->cloud_context) {
54
    fprintf(stderr, "cloudcast_proto: Error retrieving cloud_helper for current node\n");
55
    free(con);
56
    return NULL;
57
  }
58

    
59
  con->myEntry = cache_init(1, meta_size, 0);
60
  tmp = cache_init(0, meta_size, 0);
61
  con->def_cloudcache_len = cloudcast_payload_fill(con, con->def_cloudcache, sizeof(con->def_cloudcache), tmp, 0, 0);
62
  cache_free(tmp);
63
  cache_add(con->myEntry, s, meta, meta_size);
64

    
65
  return con;
66
}
67

    
68
struct nodeID* cloudcast_get_cloud_node(struct cloudcast_proto_context *con)
69
{
70
  return get_cloud_node(con->cloud_context, 0);
71
}
72

    
73
struct nodeID** cloudcast_get_cloud_nodes(struct cloudcast_proto_context *con, uint8_t number)
74
{
75
  int i;
76
  struct nodeID* *cloud_nodes;
77

    
78
  if (number == 0) return NULL;
79
  cloud_nodes = calloc(number, sizeof(struct nodeID*));
80

    
81
  for (i=0; i< number; i++)
82
    cloud_nodes[i] = get_cloud_node(con->cloud_context, i);
83
  return cloud_nodes;
84
}
85

    
86
int cloudcast_is_cloud_node(struct cloudcast_proto_context *con, struct nodeID* node)
87
{
88
  return is_cloud_node(con->cloud_context, node);
89
}
90

    
91
int cloudcast_reply_peer(struct cloudcast_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache)
92
{
93
  return topo_reply(context->topo_context, c, local_cache, MSG_TYPE_TOPOLOGY, CLOUDCAST_REPLY, 0, 0);
94
}
95

    
96
int cloudcast_query_peer(struct cloudcast_proto_context *context, struct peer_cache *sent_cache, struct nodeID *dst)
97
{
98
  return topo_query_peer(context->topo_context, sent_cache, dst, MSG_TYPE_TOPOLOGY, CLOUDCAST_QUERY, 0);
99
}
100

    
101

    
102
int cloudcast_payload_fill(struct cloudcast_proto_context *context, uint8_t *payload, int size, struct peer_cache *c, int max_peers, int include_me)
103
{
104
  int i;
105
  uint8_t *p = payload;
106

    
107
  if (!max_peers) max_peers = MAX_MSG_SIZE; // just to be sure to dump the whole cache...
108
  p += cache_header_dump(p, c, include_me);
109
  if (include_me) {
110
    p += entry_dump(p, context->myEntry, 0, size - (p - payload));
111
    max_peers--;
112
  }
113
  for (i = 0; nodeid(c, i) && max_peers; i++) {
114
    if (!is_cloud_node(context->cloud_context, nodeid(c, i))) {
115
      int res;
116
      res = entry_dump(p, c, i, size - (p - payload));
117
      if (res < 0) {
118
        fprintf(stderr, "too many entries!\n");
119
        return -1;
120
      }
121
      p += res;
122
      --max_peers;
123
    }
124
  }
125

    
126
  return p - payload;
127
}
128

    
129

    
130
int cloudcast_reply_cloud(struct cloudcast_proto_context *context, struct peer_cache *cloud_cache)
131
{
132
  uint8_t headerless_pkt[MAX_MSG_SIZE - sizeof(cloud_header)];
133
  int len, res;
134

    
135
  len = cloudcast_payload_fill(context, headerless_pkt, MAX_MSG_SIZE - sizeof(cloud_header), cloud_cache, 0, 1);
136

    
137
  if (len > 0)
138
    res = put_on_cloud(context->cloud_context, CLOUD_VIEW_KEY, headerless_pkt, len, 0);
139
  else
140
    res = 0;
141
  return res;
142
}
143

    
144
int cloudcast_query_cloud(struct cloudcast_proto_context *context)
145
{
146
  return get_from_cloud_default(context->cloud_context, CLOUD_VIEW_KEY,
147
                                (uint8_t *)&cloud_header, sizeof(cloud_header),
148
                                0, context->def_cloudcache,
149
                                context->def_cloudcache_len, 0);
150
}
151

    
152
struct peer_cache * cloudcast_cloud_default_reply(struct peer_cache *template, struct nodeID *cloud_entry)
153
{
154
  int size;
155
  struct peer_cache *cloud_reply;
156
  get_metadata(template, &size);
157
  cloud_reply = cache_init(1, size, 0);
158
  cache_add(cloud_reply, cloud_entry, NULL, 0);
159
  return cloud_reply;
160
}
161

    
162
time_t cloudcast_timestamp_cloud(struct cloudcast_proto_context *context)
163
{
164
  return timestamp_cloud(context->cloud_context);
165
}
166

    
167
int cloudcast_proto_change_metadata(struct cloudcast_proto_context *context, const void *metadata, int metadata_size)
168
{
169
  if (topo_proto_metadata_update(context->topo_context, metadata, metadata_size) <= 0) {
170
    return -1;
171
  }
172

    
173
  return 1;
174
}