Revision 5f3adef4 rep/publish.c

View differences:

rep/publish.c
8 8

  
9 9
struct streambuffer publish_streambuffer = { NULL, NULL, 0};
10 10

  
11
void bprintf(struct streambuffer *sb, const char *txt, ...) {
12
	va_list str_args;
13
	va_start(str_args, txt);
14
#if !WIN32 && !MAC_OS
15
	vfprintf(sb->stream, txt, str_args);
16
#else
17
	int space = sb->buffsize - sb->len;
18
	while(sb->buffer) { 
19
		int written = vsnprintf(sb->buffer+sb->len, space, txt, str_args);
20
		char *newbuf;
21
        if(written < space -1) {
22
			sb->len += written;
23
			break;
24
		}
25
		newbuf = (char *)realloc(sb->buffer, sb->buffsize +
26
				SB_INCREMENT);
27
		if(newbuf == 0) {
28
			fprintf(stderr, "Out of memory on memstream buffer extend (to %d bytes)",
29
					sb->buffsize + SB_INCREMENT); 
30
			return;
31
		}
32
		sb->buffer = newbuf;
33
		sb->buffsize += SB_INCREMENT;
34
	}	
35
#endif
36
	va_end(str_args);
37
}
38

  
39
void brewind(struct streambuffer *sb) {
40
#if !WIN32 && !MAC_OS
41
	rewind(sb->stream);
42
#else
43
	if(sb->buffer != NULL) *sb->buffer = 0;
44
	sb->len = 0;
45
#endif
46
}
47

  
48
void bflush(struct streambuffer *sb) {
49
#if !WIN32 && !MAC_OS
50
	    fflush(sb->stream);
51
#endif
52
}
53

  
54

  
55

  
56 11
const char *encode_measurementrecord(const MeasurementRecord *r) {
57
	brewind(&publish_streambuffer);
12
	rewind(publish_streambuffer.stream);
58 13

  
59
	bprintf(&publish_streambuffer, "originator=%s&", r->originator);
14
	fprintf(publish_streambuffer.stream, "originator=%s&", r->originator);
60 15
	if (r->targetA)
61
		bprintf(&publish_streambuffer, "targetA=%s&", r->targetA);
16
		fprintf(publish_streambuffer.stream, "targetA=%s&", r->targetA);
62 17
	if (r->targetB)
63
		bprintf(&publish_streambuffer, "targetB=%s&", r->targetB);
64
	bprintf(&publish_streambuffer, "published_name=%s", r->published_name);
18
		fprintf(publish_streambuffer.stream, "targetB=%s&", r->targetB);
19
	fprintf(publish_streambuffer.stream, "published_name=%s", r->published_name);
65 20

  
66 21
	if (r->string_value)
67
		bprintf(&publish_streambuffer, "&string_value=%s", r->string_value);
22
		fprintf(publish_streambuffer.stream, "&string_value=%s", r->string_value);
68 23
	else if (r->value != (0.0/0.0)) 
69
		bprintf(&publish_streambuffer, "&value=%f", r->value);
24
		fprintf(publish_streambuffer.stream, "&value=%lf", r->value);
70 25

  
71 26
	if (r->channel) 
72
		bprintf(&publish_streambuffer, "&channel=%s", r->channel);
27
		fprintf(publish_streambuffer.stream, "&channel=%s", r->channel);
73 28

  
74 29
	if (r->timestamp.tv_sec + r->timestamp.tv_usec != 0) {
75
		bprintf(&publish_streambuffer, "&timestamp=%s",
30
		fprintf(publish_streambuffer.stream, "&timestamp=%s",
76 31
				timeval2str(&(r->timestamp)));
77 32
	}
78 33

  
79
	bflush(&publish_streambuffer);
34
	fflush(publish_streambuffer.stream);
80 35
	/*debug("result: %s", publish_streambuffer.buffer);*/
81 36
	return publish_streambuffer.buffer;
82 37
}
......
99 54
			response[response_len] = 0;
100 55
			debug("Response string (len %d): %s", response_len, response);
101 56
			free(response);
102
		}	
57
		}
103 58
		if (user_cb) user_cb((HANDLE)server, id, cbdata->cbarg, req->response_code ? req->response_code : -1);
104 59
		return;
105 60
	}
......
119 74
		return 0;
120 75
	}
121 76

  
122
	info("abouttopublish,%s,%s,%s,%s,%f,%s,%s,%s\n", 
77
	fprintf(stderr,"abouttopublish,%s,%s,%s,%s,%lf,%s,%s,%s\n", 
123 78
		r->originator, r->targetA, r->targetB, r->published_name, r->value, 
124 79
		r->string_value, r->channel, timeval2str(&(r->timestamp)));
125 80
	char uri[1024];
126 81
	char buf[1024];
82
	const char *encoded = encode_measurementrecord(r);
127 83

  
128 84
	request_data *rd = (request_data *)malloc(sizeof(request_data));
129 85
	if (!rd) return NULL;
......
182 138
	rep->in_transit_entries = 0;
183 139
}
184 140

  
185
#define REPO_RECORD_LEN 300
186 141
/** Batch publish callback */
187 142
void deferred_publish_cb(evutil_socket_t fd, short what, void *arg) {
188 143
	struct reposerver *rep = (struct reposerver *)arg;
......
190 145
	if (rep->publish_buffer_entries) {
191 146
		debug("Deferred publish callback: %d entries to publish",
192 147
				rep->publish_buffer_entries);
193
		char *post_data = malloc(rep->publish_buffer_entries *
194
				REPO_RECORD_LEN + 10);
148
		char *post_data = malloc(rep->publish_buffer_entries * 256);
195 149
		if (!post_data) fatal("Out of memory!");
196 150
		post_data[0] = 0;
197 151

  
198 152
		int i;
199 153
		for (i = 0; i != rep->publish_buffer_entries; i++) {
200
			const char *enc;
201
		   	enc = encode_measurementrecord(&(rep->publish_buffer[i].r));
202
			if(strlen(enc) >= REPO_RECORD_LEN) {
203
			 warn("Skipping publish of HUGE record of %d bytes", strlen(enc));
204
			}
205
			else {
206
				strcat(post_data,enc);
207
				strcat(post_data, "\n");
208
			}
154
			strcat(post_data,
155
					encode_measurementrecord(&(rep->publish_buffer[i].r)));
156
			strcat(post_data, "\n");
209 157
		}
210 158

  
211 159
		make_post_request("/BatchPublish", post_data,
......
227 175
		event_base_once(eventbase, -1, EV_TIMEOUT, deferred_publish_cb, rep, &t); 
228 176
	}
229 177
}
178

  

Also available in: Unified diff