Revision 15edce59

View differences:

chunker_player/tcp_chunk_puller.c
18 18
#include <netinet/in.h>
19 19
#include <unistd.h>
20 20

  
21
#define TCP_BUF_SIZE 65536
21
#define TCP_BUF_SIZE 65536*16
22 22

  
23 23
static int listen_port = 0;
24 24
static int accept_fd = -1;
......
49 49
	
50 50
	printf("listening on port %$d\n", port);
51 51
	
52
	listen(accept_fd, 10);
53
	
54 52
	if(pthread_create( &AcceptThread, NULL, &AcceptThreadProc, NULL) != 0)
55 53
	{
56 54
		fprintf(stderr,"TCP-INPUT-MODULE: could not start accepting thread!!\n");
......
67 65
    int fd = -1;
68 66
    
69 67
    isRunning = 1;
68

  
69
    listen(accept_fd, 10);
70 70
    
71 71
    while(isRunning)
72 72
    {
......
98 98
static void* RecvThreadProc(void* params)
99 99
{
100 100
	int ret = -1;
101
	int fragment_size = 0;
101
	uint32_t fragment_size = 0;
102 102
	uint8_t* buffer = (uint8_t*) malloc(TCP_BUF_SIZE);
103
	while(isReceving)
104
	{
103

  
104
	fprintf(stderr,"TCP-INPUT-MODULE: receive thread created\n");
105

  
106
	while(isReceving) {
105 107
		ret=recv(socket_fd, &fragment_size, sizeof(uint32_t), 0);
106
		if(ret <= sizeof(uint32_t))
107
			continue;
108
		
108
		fragment_size = ntohl(fragment_size);
109

  
110
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes. Fragment size: %d\n", ret, fragment_size);
111
		if(ret <= 0) {
112
			break;
113
		}
114

  
109 115
		ret=recv(socket_fd, buffer, fragment_size, 0);
110
		while(ret <= fragment_size)
116
fprintf(stderr, "TCP-INPUT-MODULE: received %d bytes.\n", ret);
117
		while(ret < fragment_size)
111 118
			ret+=recv(socket_fd, buffer+ret, fragment_size-ret, 0);
112 119
		
113 120
		if(enqueueBlock(buffer, fragment_size))
114 121
			fprintf(stderr, "TCP-INPUT-MODULE: could not enqueue a received chunk!! \n");
115 122
	}
116 123
	free(buffer);
117
	
124
	close(socket_fd);
125
	socket_fd = -1;
126

  
118 127
	return NULL;
119 128
}
120 129

  
chunker_streamer/chunk_pusher.c
171 171
	return ret;
172 172
}
173 173

  
174
int sendViaTcp(Chunk gchunk, int buffer_size)
174
int sendViaTcp(Chunk gchunk, uint32_t buffer_size)
175 175
{
176 176
	uint8_t *buffer=NULL;
177 177

  
......
183 183
		return ret;
184 184
	}
185 185

  
186
	if( (buffer = malloc(buffer_size)) != NULL) {
186
	if( (buffer = malloc(4 + buffer_size)) != NULL) {
187 187
		/* encode the GRAPES chunk into network bytes */
188
		encodeChunk(&gchunk, buffer, buffer_size);
188
		encodeChunk(&gchunk, buffer + 4, buffer_size);
189
		*(uint32_t*)buffer = htonl(buffer_size);
189 190
		
190
		int ret = send(tcp_fd, buffer, buffer_size, 0);
191
		int ret = send(tcp_fd, buffer, 4 + buffer_size, 0);
192
fprintf(stderr, "TCP IO-MODULE: sending %d bytes, %d sent\n", buffer_size, ret);
191 193
		int tmp;
192 194
		while(ret != buffer_size)
193 195
		{
194
			tmp = send(tcp_fd, buffer+ret, buffer_size-ret, 0);
196
			tmp = send(tcp_fd, buffer+ret, 4 + buffer_size - ret, 0);
195 197
			if(tmp > 0)
196 198
				ret += tmp;
197 199
			else

Also available in: Unified diff