Statistics
| Branch: | Revision:

grapes / src / Cache / cloudcast_proto.c @ e910cf6c

History | View | Annotate | Download (5.47 KB)

1
/*
2
 *  Copyright (c) 2010 Andrea Zito
3
 *
4
 *  This is free software; see lgpl-2.1.txt
5
 */
6

    
7
#include <stdint.h>
8
#include <stdlib.h>
9
#include <stdio.h>
10
#include <string.h>
11

    
12
#include "net_helper.h"
13
#include "topocache.h"
14
#include "proto.h"
15
#include "topo_proto.h"
16
#include "cloudcast_proto.h"
17
#include "grapes_msg_types.h"
18

    
19
// TODO: This should not be duplicated here. should inherit from topo_proto.c
20
#define MAX_MSG_SIZE 1500
21

    
22
#define CLOUD_VIEW_KEY "view"
23
#define CLOUDCAST_MESSAGE_HEADER 10 /* PROTOCOL MSG_TYPE LAST_CLOUD_CONTACT */
24

    
25
uint8_t cloud_header[10] = { MSG_TYPE_TOPOLOGY, CLOUDCAST_CLOUD, 0llu};
26

    
27
struct cloudcast_proto_context {
28
  struct peer_cache* myEntry;
29
  uint8_t def_cloudcache[128];
30
  int def_cloudcache_len;
31
  struct topo_context *topo_context;
32
  struct cloud_helper_context *cloud_context;
33
};
34

    
35
int cloudcast_payload_fill(struct cloudcast_proto_context *context, uint8_t *payload, int size, struct peer_cache *c, int max_peers, int include_me);
36

    
37
struct cloudcast_proto_context* cloudcast_proto_init(struct nodeID *s, const void *meta, int meta_size)
38
{
39
  struct cloudcast_proto_context *con;
40
  struct peer_cache *tmp;
41
  con = malloc(sizeof(struct cloudcast_proto_context));
42

    
43
  if (!con) {
44
    fprintf(stderr, "cloudcast_proto: Error initializing context. ENOMEM\n");
45
    return NULL;
46
  }
47

    
48
  con->topo_context = topo_proto_init(s, meta, meta_size);
49
  if (!con->topo_context){
50
    fprintf(stderr, "cloudcast_proto: Error initializing topo_proto.\n");
51
    free(con);
52
    return NULL;
53
  }
54

    
55
  con->cloud_context = get_cloud_helper_for(s);
56
  if (!con->cloud_context) {
57
    fprintf(stderr, "cloudcast_proto: Error retrieving cloud_helper for current node\n");
58
    free(con);
59
    return NULL;
60
  }
61

    
62
  con->myEntry = cache_init(1, meta_size, 0);
63
  tmp = cache_init(0, meta_size, 0);
64
  con->def_cloudcache_len = cloudcast_payload_fill(con, con->def_cloudcache, sizeof(con->def_cloudcache), tmp, 0, 0);
65
  cache_free(tmp);
66
  cache_add(con->myEntry, s, meta, meta_size);
67

    
68
  return con;
69
}
70

    
71
struct nodeID* cloudcast_get_cloud_node(struct cloudcast_proto_context *con)
72
{
73
  return get_cloud_node(con->cloud_context, 0);
74
}
75

    
76
struct nodeID** cloudcast_get_cloud_nodes(struct cloudcast_proto_context *con, uint8_t number)
77
{
78
  int i;
79
  struct nodeID* *cloud_nodes;
80

    
81
  if (number == 0) return NULL;
82
  cloud_nodes = calloc(number, sizeof(struct nodeID*));
83

    
84
  for (i=0; i< number; i++)
85
    cloud_nodes[i] = get_cloud_node(con->cloud_context, i);
86
  return cloud_nodes;
87
}
88

    
89
int cloudcast_is_cloud_node(struct cloudcast_proto_context *con, struct nodeID* node)
90
{
91
  return is_cloud_node(con->cloud_context, node);
92
}
93

    
94
int cloudcast_reply_peer(struct cloudcast_proto_context *context, const struct peer_cache *c, struct peer_cache *local_cache, uint64_t last_cloud_contact_sec)
95
{
96
  uint8_t *header = (uint8_t *) &last_cloud_contact_sec;
97
  int header_len = sizeof(uint64_t);
98
  return topo_reply_header(context->topo_context, c, local_cache, MSG_TYPE_TOPOLOGY, CLOUDCAST_REPLY, header, header_len, 0, 0);
99
}
100

    
101
int cloudcast_query_peer(struct cloudcast_proto_context *context, struct peer_cache *sent_cache, struct nodeID *dst, uint64_t last_cloud_contact_sec)
102
{
103
  uint8_t *header = (uint8_t *) &last_cloud_contact_sec;
104
  int header_len = sizeof(uint64_t);
105
  return topo_query_peer_header(context->topo_context, sent_cache, dst, MSG_TYPE_TOPOLOGY, CLOUDCAST_QUERY, header, header_len, 0);
106
}
107

    
108

    
109
int cloudcast_payload_fill(struct cloudcast_proto_context *context, uint8_t *payload, int size, struct peer_cache *c, int max_peers, int include_me)
110
{
111
  int i;
112
  uint8_t *p = payload;
113

    
114
  if (!max_peers) max_peers = MAX_MSG_SIZE; // FIXME: we should use topo_proto for this
115
  p += cache_header_dump(p, c, include_me);
116
  if (include_me) {
117
    p += entry_dump(p, context->myEntry, 0, size - (p - payload));
118
    max_peers--;
119
  }
120
  for (i = 0; nodeid(c, i) && max_peers; i++) {
121
    if (!is_cloud_node(context->cloud_context, nodeid(c, i))) {
122
      int res;
123
      res = entry_dump(p, c, i, size - (p - payload));
124
      if (res < 0) {
125
        fprintf(stderr, "too many entries!\n");
126
        return -1;
127
      }
128
      p += res;
129
      --max_peers;
130
    }
131
  }
132

    
133
  return p - payload;
134
}
135

    
136

    
137
int cloudcast_reply_cloud(struct cloudcast_proto_context *context, struct peer_cache *cloud_cache)
138
{
139
  uint8_t headerless_pkt[MAX_MSG_SIZE - CLOUDCAST_MESSAGE_HEADER];
140
  int len, res;
141

    
142
  len = cloudcast_payload_fill(context, headerless_pkt, MAX_MSG_SIZE - CLOUDCAST_MESSAGE_HEADER, cloud_cache, 0, 1);
143

    
144
  if (len > 0)
145
    res = put_on_cloud(context->cloud_context, CLOUD_VIEW_KEY, headerless_pkt, len, 0);
146
  else
147
    res = 0;
148
  return res;
149
}
150

    
151
int cloudcast_query_cloud(struct cloudcast_proto_context *context)
152
{
153
  return get_from_cloud_default(context->cloud_context, CLOUD_VIEW_KEY,
154
                                cloud_header, CLOUDCAST_MESSAGE_HEADER,
155
                                0, context->def_cloudcache,
156
                                context->def_cloudcache_len, 0);
157
}
158

    
159
struct peer_cache * cloudcast_cloud_default_reply(struct peer_cache *template, struct nodeID *cloud_entry)
160
{
161
  int size;
162
  struct peer_cache *cloud_reply;
163
  get_metadata(template, &size);
164
  cloud_reply = cache_init(1, size, 0);
165
  cache_add(cloud_reply, cloud_entry, NULL, 0);
166
  return cloud_reply;
167
}
168

    
169
time_t cloudcast_timestamp_cloud(struct cloudcast_proto_context *context)
170
{
171
  return timestamp_cloud(context->cloud_context);
172
}
173

    
174
int cloudcast_proto_change_metadata(struct cloudcast_proto_context *context, const void *metadata, int metadata_size)
175
{
176
  if (topo_proto_metadata_update(context->topo_context, metadata, metadata_size) <= 0) {
177
    return -1;
178
  }
179

    
180
  return 1;
181
}