Statistics
| Branch: | Tag: | Revision:

dvbd / src / connection.cpp @ 9c9e8151

History | View | Annotate | Download (6.22 KB)

1
/*
2
  Copyright 2003 John Knottenbelt
3
  
4
  This program is free software; you can redistribute it and/or modify
5
  it under the terms of the GNU General Public License as published by
6
  the Free Software Foundation; either version 2 of the License, or
7
  (at your option) any later version.
8
 
9
  This program is distributed in the hope that it will be useful,
10
  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
  GNU General Public License for more details.
13
 
14
  You should have received a copy of the GNU General Public License
15
  along with this program; if not, write to the Free Software
16
  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111, USA.
17
*/
18

    
19
#include "config.h"
20
#include "connection.h"
21
#include "demuxer.h"
22
#include "utils.h"
23
#include "stringutil.h"
24
#include "inputbuffer.h"
25
#include "outputbuffer.h"
26
#include "scheduler.h"
27

    
28
#include <iostream>
29
#include <cstring>
30
#include <vector>
31
#include <unistd.h>
32
#include <cerrno>
33
#include <ctime>
34
#include <algorithm>
35
#include <cassert>
36

    
37
Connection::Connection(TunerManager *tm, int dataFD)
38
  : dataFD(dataFD), priority(0), 
39
    tuned(false), retuning(false), 
40
    tm(tm), removeMe(false),
41
    stopTime(0),
42
    interrupted(false), 
43
    lostPacket(false),
44
    convertToPS(false),
45
    lastTime(0)
46
{
47
  if (debugLevel > 1)
48
    std::cerr << "New client connection" << std::endl;
49

    
50
  dataOutput = new OutputBuffer(dataFD, 16384, 1048576);
51
}
52

    
53
Connection::~Connection()
54
{
55
  if (debugLevel > 1)
56
    std::cerr << "Client connection closed" << std::endl;
57

    
58
  retuning=true;
59
  doUnsubscribe();
60
  
61
  delete dataOutput;
62

    
63
  close(dataFD);
64
}
65

    
66
void Connection::doUnsubscribe()
67
{
68
  SourceList copy = demuxers;
69
  for (SourceList::iterator i = copy.begin(); i != copy.end(); i++)
70
    (*i)->unsubscribe(this);
71
}
72

    
73
bool Connection::doSubscribe()
74
{
75
  bool result = true;
76

    
77
  for (SourceList::iterator i = demuxers.begin(); i != demuxers.end(); i++) 
78
    result = (*i)->subscribe(this) && result;
79
   
80
  if (!result) {
81
    doUnsubscribe();
82
    return false;
83
  }
84

    
85
  return true;
86
}
87

    
88
void Connection::notifyUnsubscribe(Source *s)
89
{
90
  Demuxer *d = (Demuxer *) s;
91
  demuxers.remove(d);
92
  interrupted = true;
93
}
94

    
95
time_t Connection::getCurrentTime() const
96
{
97
  return time(NULL);
98
}
99

    
100
void Connection::addSelectFDs(Select &s) const
101
{
102
  dataOutput->addSelectFDs(s);
103

    
104
  if (stopTime != 0)
105
    s.addAlarm(std::max(0L, stopTime - getCurrentTime()), 0);
106
}
107

    
108
bool Connection::isReady(const Select &s) const
109
{
110
  if (removeMe)
111
    return true;
112

    
113
  return dataOutput->isReady(s);
114
}
115

    
116
int Connection::getPriority() const
117
{
118
  return priority;
119
}
120

    
121
void Connection::setPriority(int p)
122
{
123
  priority = p;
124
}
125

    
126
void Connection::setStopTime(time_t newStopTime)
127
{
128
  stopTime = newStopTime;
129
}
130

    
131
time_t Connection::getStopTime() const
132
{
133
  return stopTime;
134
}
135

    
136
bool Connection::isValidType(const std::string &type ) const
137
{
138
  return tm->validType(type);
139
}
140

    
141
bool Connection::isValidChannel(const std::string &type, const std::string &channel) const
142
{
143
  return tm->validChannel(type, channel);
144
}
145

    
146
bool Connection::canTune(const std::string &newType, const std::string &newChannel, int p) const
147
{
148
  SavePriority save(const_cast<Connection *>(this), p - 1);
149
  
150
  return tm->canGetDemuxers(newType, newChannel, p);
151
}
152

    
153
void Connection::getTunableChannels(const std::string &type, int priority, StringList &channels) const
154
{
155
  tm->getTunableChannels(type, priority, channels);
156
}
157

    
158
bool Connection::isRetuning() const {
159
  return retuning;
160
}
161

    
162
bool Connection::numDemuxers() const
163
{
164
  return demuxers.empty();
165
}
166

    
167
bool Connection::getInterrupted() const {
168
  return interrupted;
169
}
170

    
171
void Connection::setRemoveMe(bool status)
172
{
173
  removeMe = status;
174
}
175

    
176

    
177
bool Connection::tune(const std::string &newType, const std::string &newChannel, bool convertToPS, int p)
178
{
179
  if (tuned) {
180
    if (type == newType && channel == newChannel) {
181
      setPriority(p);
182
      return true;
183
    }
184
  }
185

    
186
  if (debugLevel > 1) {
187
    std::cerr << "Tuning to " << newType << " " << newChannel 
188
              << " priority " << p << std::endl;
189
  }
190

    
191
  // The retuning variable prevents the client from being notified
192
  // that he has lost a subscription. The loss of subscription is to
193
  // be expected since the request is to change the channel!
194
  retuning = true;
195
  
196
  SourceList newDemuxers;
197

    
198
  // Set the priority of the current channel
199
  // to be less than the specified priority
200
  {
201
    SavePriority save(this, p - 1);
202

    
203
    // Try to allocate the new demuxers
204
    tm->getDemuxers(newType, newChannel, convertToPS, newDemuxers, p);
205
    retuning = false;
206

    
207
    if (newDemuxers.empty()) 
208
      return false;      // We remain subscribed to the old channel.
209
  }
210

    
211
  // Unsubscribe (if necessary) from the old and subscribe to the new
212

    
213
  doUnsubscribe();
214
  demuxers = newDemuxers;
215
  type = newType;
216
  channel = newChannel;
217
  bool result = doSubscribe();
218
  setPriority(p);
219
  tuned = true;
220
  interrupted = false;
221
  lostPacket = false;
222
  this->convertToPS = convertToPS;
223
  return result;
224
}
225

    
226
bool Connection::processReady(const Select &s)
227
{
228
  if (stopTime != 0 && getCurrentTime() > stopTime) {
229
    if (debugLevel > 0) {
230
      std::cerr << "Stop time reached (stopTime = " << stopTime 
231
                << ", now = " << getCurrentTime() << ")\n";
232
    }
233
    return false;
234
  }
235

    
236
  if (removeMe)
237
    return false;
238

    
239
  if (dataOutput->isReady(s)) {
240
    if (!dataOutput->process()) 
241
      return false;
242
  }
243

    
244
  return true;
245
}
246

    
247
void Connection::receiveData(unsigned char *data, unsigned size)
248
{
249
  if (debugLevel > 0) {
250
    time_t now = time(NULL);
251

    
252
    if (now - lastTime >= 2) {
253
      std::cerr << "Buffer usage:" << dataOutput->getUsed() << "/" << dataOutput->getMaxSize() 
254
                << std::endl;
255
      lastTime = now;
256
    }
257
  }
258
  
259
  // Receive data from the Demuxers and forward it to the clients
260
  if (!dataOutput->write((char *) data, size)) {
261
    if (!lostPacket) {
262
      std::cerr << "Warning: client not reading data connection fast enough (dropping packets)" << std::endl;
263
      lostPacket = true;
264
    }
265
    if (!convertToPS) {
266
      // Drop a TS packet from the beginning
267
      dataOutput->discard(188 + dataOutput->getUsed() % 188);
268
      dataOutput->write((char *) data, size);
269
    }
270
  }
271
}
272

    
273
std::ostream& Connection::printOn(std::ostream &o) const
274
{
275
  if (!tuned) 
276
    return o << "UNTUNED";
277
  else
278
    return o << priority << " " << type << " " << escapeWS(channel);
279
}