Statistics
| Branch: | Tag: | Revision:

dvbd / src / connection.cpp @ 904d298f

History | View | Annotate | Download (5.79 KB)

1 bfdb7446 jak
/*
2
  Copyright 2003 John Knottenbelt
3
  
4
  This program is free software; you can redistribute it and/or modify
5
  it under the terms of the GNU General Public License as published by
6
  the Free Software Foundation; either version 2 of the License, or
7
  (at your option) any later version.
8
 
9
  This program is distributed in the hope that it will be useful,
10
  but WITHOUT ANY WARRANTY; without even the implied warranty of
11
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
  GNU General Public License for more details.
13
 
14
  You should have received a copy of the GNU General Public License
15
  along with this program; if not, write to the Free Software
16
  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111, USA.
17
*/
18
19
#include "config.h"
20
#include "connection.h"
21
#include "demuxer.h"
22
#include "utils.h"
23
#include "stringutil.h"
24
#include "inputbuffer.h"
25
#include "outputbuffer.h"
26
#include "scheduler.h"
27
28
#include <iostream>
29
#include <cstring>
30
#include <vector>
31
#include <unistd.h>
32
#include <cerrno>
33
#include <ctime>
34
#include <algorithm>
35 2cecea08 jak
#include <cassert>
36 bfdb7446 jak
37
Connection::Connection(TunerManager *tm, int dataFD)
38
  : dataFD(dataFD), priority(0), 
39
    tuned(false), retuning(false), 
40
    tm(tm), removeMe(false),
41 f9b7ce06 jak
    interrupted(false), 
42 ba6f6023 jak
    lostPacket(false),
43
    convertToPS(false),
44
    lastTime(0)
45 bfdb7446 jak
{
46
  if (debugLevel > 1)
47
    std::cerr << "New client connection" << std::endl;
48
49 ba6f6023 jak
  dataOutput = new OutputBuffer(dataFD, 16384, 1048576);
50 bfdb7446 jak
}
51
52
Connection::~Connection()
53
{
54
  if (debugLevel > 1)
55
    std::cerr << "Client connection closed" << std::endl;
56
57
  retuning=true;
58
  doUnsubscribe();
59
  
60
  delete dataOutput;
61
62
  close(dataFD);
63
}
64
65
void Connection::doUnsubscribe()
66
{
67 59be6a47 jak
  SourceList copy = demuxers;
68
  for (SourceList::iterator i = copy.begin(); i != copy.end(); i++)
69 bfdb7446 jak
    (*i)->unsubscribe(this);
70
}
71
72
bool Connection::doSubscribe()
73
{
74
  bool result = true;
75
76 59be6a47 jak
  for (SourceList::iterator i = demuxers.begin(); i != demuxers.end(); i++) 
77 bfdb7446 jak
    result = (*i)->subscribe(this) && result;
78 2cecea08 jak
   
79 bfdb7446 jak
  if (!result) {
80
    doUnsubscribe();
81
    return false;
82
  }
83 2cecea08 jak
84 bfdb7446 jak
  return true;
85
}
86
87
void Connection::notifyUnsubscribe(Source *s)
88
{
89
  Demuxer *d = (Demuxer *) s;
90
  demuxers.remove(d);
91
  interrupted = true;
92
}
93
94
time_t Connection::getCurrentTime() const
95
{
96
  return time(NULL);
97
}
98
99
void Connection::addSelectFDs(Select &s) const
100
{
101
  dataOutput->addSelectFDs(s);
102
}
103
104
bool Connection::isReady(const Select &s) const
105
{
106
  if (removeMe)
107
    return true;
108
109
  return dataOutput->isReady(s);
110
}
111
112
int Connection::getPriority() const
113
{
114
  return priority;
115
}
116
117
void Connection::setPriority(int p)
118
{
119
  priority = p;
120
}
121
122
bool Connection::isValidType(const std::string &type ) const
123
{
124
  return tm->validType(type);
125
}
126
127
bool Connection::isValidChannel(const std::string &type, const std::string &channel) const
128
{
129
  return tm->validChannel(type, channel);
130
}
131
132
bool Connection::canTune(const std::string &newType, const std::string &newChannel, int p) const
133
{
134
  SavePriority save(const_cast<Connection *>(this), p - 1);
135
  
136
  return tm->canGetDemuxers(newType, newChannel, p);
137
}
138
139
void Connection::getTunableChannels(const std::string &type, int priority, StringList &channels) const
140
{
141
  tm->getTunableChannels(type, priority, channels);
142
}
143
144
bool Connection::isRetuning() const {
145
  return retuning;
146
}
147
148
bool Connection::numDemuxers() const
149
{
150
  return demuxers.empty();
151
}
152
153
bool Connection::getInterrupted() const {
154
  return interrupted;
155
}
156
157
void Connection::setRemoveMe(bool status)
158
{
159
  removeMe = status;
160
}
161
162
163 59be6a47 jak
bool Connection::tune(const std::string &newType, const std::string &newChannel, bool convertToPS, int p)
164 bfdb7446 jak
{
165
  if (tuned) {
166
    if (type == newType && channel == newChannel) {
167
      setPriority(p);
168
      return true;
169
    }
170
  }
171
172
  if (debugLevel > 1) {
173
    std::cerr << "Tuning to " << newType << " " << newChannel 
174
              << " priority " << p << std::endl;
175
  }
176
177
  // The retuning variable prevents the client from being notified
178
  // that he has lost a subscription. The loss of subscription is to
179
  // be expected since the request is to change the channel!
180
  retuning = true;
181
  
182 59be6a47 jak
  SourceList newDemuxers;
183 bfdb7446 jak
184
  // Set the priority of the current channel
185
  // to be less than the specified priority
186
  {
187
    SavePriority save(this, p - 1);
188
189
    // Try to allocate the new demuxers
190 59be6a47 jak
    tm->getDemuxers(newType, newChannel, convertToPS, newDemuxers, p);
191 bfdb7446 jak
    retuning = false;
192
193
    if (newDemuxers.empty()) 
194
      return false;      // We remain subscribed to the old channel.
195
  }
196
197
  // Unsubscribe (if necessary) from the old and subscribe to the new
198
199
  doUnsubscribe();
200
  demuxers = newDemuxers;
201
  type = newType;
202
  channel = newChannel;
203
  bool result = doSubscribe();
204
  setPriority(p);
205
  tuned = true;
206
  interrupted = false;
207 f9b7ce06 jak
  lostPacket = false;
208 ba6f6023 jak
  this->convertToPS = convertToPS;
209 bfdb7446 jak
  return result;
210
}
211
212
bool Connection::processReady(const Select &s)
213
{
214
  if (removeMe)
215
    return false;
216
217
  if (dataOutput->isReady(s)) {
218
    if (!dataOutput->process()) 
219
      return false;
220
  }
221
222
  return true;
223
}
224
225
void Connection::receiveData(unsigned char *data, unsigned size)
226
{
227 ba6f6023 jak
  if (debugLevel > 0) {
228
    time_t now = time(NULL);
229
230
    if (now - lastTime >= 2) {
231
      std::cerr << "Buffer usage:" << dataOutput->getUsed() << "/" << dataOutput->getMaxSize() 
232
                << std::endl;
233
      lastTime = now;
234
    }
235
  }
236
  
237 bfdb7446 jak
  // Receive data from the Demuxers and forward it to the clients
238
  if (!dataOutput->write((char *) data, size)) {
239 f9b7ce06 jak
    if (!lostPacket) {
240
      std::cerr << "Warning: client not reading data connection fast enough (dropping packets)" << std::endl;
241
      lostPacket = true;
242
    }
243 ba6f6023 jak
    if (!convertToPS) {
244
      // Drop a TS packet from the beginning
245
      dataOutput->discard(188 + dataOutput->getUsed() % 188);
246
      dataOutput->write((char *) data, size);
247
    }
248 bfdb7446 jak
  }
249
}
250
251
std::ostream& Connection::printOn(std::ostream &o) const
252
{
253 a5fdfbae jak
  o << getType() << " ";
254 bfdb7446 jak
  if (!tuned) 
255
    return o << "UNTUNED";
256
  else
257
    return o << priority << " " << type << " " << escapeWS(channel);
258
}