Statistics
| Branch: | Tag: | Revision:

dvbd / src / connection.cpp @ ba6f6023

History | View | Annotate | Download (6.23 KB)

1
/*
2
  Copyright 2003 John Knottenbelt
3
  
4
  This program is free software; you can redistribute it and/or modify
5
  it under the terms of the GNU General Public License as published by
6
  the Free Software Foundation; either version 2 of the License, or
7
  (at your option) any later version.
8
 
9
  This program is distributed in the hope that it will be useful,
10
  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
  GNU General Public License for more details.
13
 
14
  You should have received a copy of the GNU General Public License
15
  along with this program; if not, write to the Free Software
16
  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111, USA.
17
*/
18

    
19
#include "config.h"
20
#include "connection.h"
21
#include "demuxer.h"
22
#include "utils.h"
23
#include "stringutil.h"
24
#include "inputbuffer.h"
25
#include "outputbuffer.h"
26
#include "scheduler.h"
27

    
28
#include <iostream>
29
#include <cstring>
30
#include <vector>
31
#include <unistd.h>
32
#include <cerrno>
33
#include <ctime>
34
#include <algorithm>
35
#include <cassert>
36

    
37
Connection::Connection(TunerManager *tm, int dataFD)
38
  : dataFD(dataFD), priority(0), 
39
    tuned(false), retuning(false), 
40
    tm(tm), removeMe(false),
41
    stopTime(0),
42
    interrupted(false), 
43
    lostPacket(false),
44
    convertToPS(false),
45
    lastTime(0)
46
{
47
  if (debugLevel > 1)
48
    std::cerr << "New client connection" << std::endl;
49

    
50
  dataOutput = new OutputBuffer(dataFD, 16384, 1048576);
51
}
52

    
53
Connection::~Connection()
54
{
55
  if (debugLevel > 1)
56
    std::cerr << "Client connection closed" << std::endl;
57

    
58
  retuning=true;
59
  doUnsubscribe();
60
  
61
  delete dataOutput;
62

    
63
  close(dataFD);
64
}
65

    
66
void Connection::doUnsubscribe()
67
{
68
  SourceList copy = demuxers;
69
  for (SourceList::iterator i = copy.begin(); i != copy.end(); i++)
70
    (*i)->unsubscribe(this);
71
}
72

    
73
bool Connection::doSubscribe()
74
{
75
  bool result = true;
76
  int n = 0;
77

    
78
  for (SourceList::iterator i = demuxers.begin(); i != demuxers.end(); i++) 
79
    result = (*i)->subscribe(this) && result;
80
   
81
  if (!result) {
82
    doUnsubscribe();
83
    return false;
84
  }
85

    
86
  return true;
87
}
88

    
89
void Connection::notifyUnsubscribe(Source *s)
90
{
91
  Demuxer *d = (Demuxer *) s;
92
  demuxers.remove(d);
93
  interrupted = true;
94
}
95

    
96
time_t Connection::getCurrentTime() const
97
{
98
  return time(NULL);
99
}
100

    
101
void Connection::addSelectFDs(Select &s) const
102
{
103
  dataOutput->addSelectFDs(s);
104

    
105
  if (stopTime != 0)
106
    s.addAlarm(std::max(0L, stopTime - getCurrentTime()), 0);
107
}
108

    
109
bool Connection::isReady(const Select &s) const
110
{
111
  if (removeMe)
112
    return true;
113

    
114
  return dataOutput->isReady(s);
115
}
116

    
117
int Connection::getPriority() const
118
{
119
  return priority;
120
}
121

    
122
void Connection::setPriority(int p)
123
{
124
  priority = p;
125
}
126

    
127
void Connection::setStopTime(time_t newStopTime)
128
{
129
  stopTime = newStopTime;
130
}
131

    
132
time_t Connection::getStopTime() const
133
{
134
  return stopTime;
135
}
136

    
137
bool Connection::isValidType(const std::string &type ) const
138
{
139
  return tm->validType(type);
140
}
141

    
142
bool Connection::isValidChannel(const std::string &type, const std::string &channel) const
143
{
144
  return tm->validChannel(type, channel);
145
}
146

    
147
bool Connection::canTune(const std::string &newType, const std::string &newChannel, int p) const
148
{
149
  SavePriority save(const_cast<Connection *>(this), p - 1);
150
  
151
  return tm->canGetDemuxers(newType, newChannel, p);
152
}
153

    
154
void Connection::getTunableChannels(const std::string &type, int priority, StringList &channels) const
155
{
156
  tm->getTunableChannels(type, priority, channels);
157
}
158

    
159
bool Connection::isRetuning() const {
160
  return retuning;
161
}
162

    
163
bool Connection::numDemuxers() const
164
{
165
  return demuxers.empty();
166
}
167

    
168
bool Connection::getInterrupted() const {
169
  return interrupted;
170
}
171

    
172
void Connection::setRemoveMe(bool status)
173
{
174
  removeMe = status;
175
}
176

    
177

    
178
bool Connection::tune(const std::string &newType, const std::string &newChannel, bool convertToPS, int p)
179
{
180
  if (tuned) {
181
    if (type == newType && channel == newChannel) {
182
      setPriority(p);
183
      return true;
184
    }
185
  }
186

    
187
  if (debugLevel > 1) {
188
    std::cerr << "Tuning to " << newType << " " << newChannel 
189
              << " priority " << p << std::endl;
190
  }
191

    
192
  // The retuning variable prevents the client from being notified
193
  // that he has lost a subscription. The loss of subscription is to
194
  // be expected since the request is to change the channel!
195
  retuning = true;
196
  
197
  SourceList newDemuxers;
198

    
199
  // Set the priority of the current channel
200
  // to be less than the specified priority
201
  {
202
    SavePriority save(this, p - 1);
203

    
204
    // Try to allocate the new demuxers
205
    tm->getDemuxers(newType, newChannel, convertToPS, newDemuxers, p);
206
    retuning = false;
207

    
208
    if (newDemuxers.empty()) 
209
      return false;      // We remain subscribed to the old channel.
210
  }
211

    
212
  // Unsubscribe (if necessary) from the old and subscribe to the new
213

    
214
  doUnsubscribe();
215
  demuxers = newDemuxers;
216
  type = newType;
217
  channel = newChannel;
218
  bool result = doSubscribe();
219
  setPriority(p);
220
  tuned = true;
221
  interrupted = false;
222
  lostPacket = false;
223
  this->convertToPS = convertToPS;
224
  return result;
225
}
226

    
227
bool Connection::processReady(const Select &s)
228
{
229
  if (stopTime != 0 && getCurrentTime() > stopTime) {
230
    if (debugLevel > 0) {
231
      std::cerr << "Stop time reached (stopTime = " << stopTime 
232
                << ", now = " << getCurrentTime() << ")\n";
233
    }
234
    return false;
235
  }
236

    
237
  if (removeMe)
238
    return false;
239

    
240
  if (dataOutput->isReady(s)) {
241
    if (!dataOutput->process()) 
242
      return false;
243
  }
244

    
245
  return true;
246
}
247

    
248
void Connection::receiveData(unsigned char *data, unsigned size)
249
{
250
  if (debugLevel > 0) {
251
    time_t now = time(NULL);
252

    
253
    if (now - lastTime >= 2) {
254
      std::cerr << "Buffer usage:" << dataOutput->getUsed() << "/" << dataOutput->getMaxSize() 
255
                << std::endl;
256
      lastTime = now;
257
    }
258
  }
259
  
260
  // Receive data from the Demuxers and forward it to the clients
261
  if (!dataOutput->write((char *) data, size)) {
262
    if (!lostPacket) {
263
      std::cerr << "Warning: client not reading data connection fast enough (dropping packets)" << std::endl;
264
      lostPacket = true;
265
    }
266
    if (!convertToPS) {
267
      // Drop a TS packet from the beginning
268
      dataOutput->discard(188 + dataOutput->getUsed() % 188);
269
      dataOutput->write((char *) data, size);
270
    }
271
  }
272
}
273

    
274
std::ostream& Connection::printOn(std::ostream &o) const
275
{
276
  if (!tuned) 
277
    return o << "UNTUNED";
278
  else
279
    return o << priority << " " << type << " " << escapeWS(channel);
280
}