00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifdef __GNUG__
00024 # pragma implementation "SimpleDispatcher.h"
00025 #endif
00026
00027 #include "UniEvent/SimpleDispatcher.h"
00028 #include "UniEvent/Event.h"
00029 #include "UniEvent/EventPtr.h"
00030 #include <iostream>
00031 #include <deque>
00032 #include <signal.h>
00033
00034
00035 namespace strmod {
00036 namespace unievent {
00037
00038
00039 class SimpleDispatcher::Imp {
00040 public:
00041 typedef std::deque<Event *> EVListBase;
00042 class EVList : private EVListBase {
00043 public:
00044 inline void addElement(Event *ev);
00045 inline bool qEmpty();
00046 inline size_t size() const { return EVListBase::size(); }
00047 inline void moveFrontTo(EVList &b);
00048 inline Event *noref_pop_front();
00049
00050 inline ~EVList();
00051 };
00052
00053 EVList mainq_;
00054 EVList busypoll_;
00055 size_t internalevnum_;
00056 EventPtr onempty_;
00057 EventPtr oninterrupt_;
00058 volatile sig_atomic_t interrupted_;
00059 Event *curevt_;
00060 };
00061
00062 namespace {
00063
00064 void *interruptBlock();
00065 void interruptUnblock(void *data);
00066
00067 }
00068
00069 inline void SimpleDispatcher::Imp::EVList::addElement(Event *ev)
00070 {
00071 ev->AddReference();
00072 push_back(ev);
00073 }
00074
00075 inline bool SimpleDispatcher::Imp::EVList::qEmpty()
00076 {
00077 return(size() <= 0);
00078 }
00079
00080 inline SimpleDispatcher::Imp::EVList::~EVList()
00081 {
00082 while (!qEmpty())
00083 {
00084 Event *ev = front();
00085
00086 pop_front();
00087 if (ev->NumReferences() > 0)
00088 {
00089 ev->DelReference();
00090 }
00091 if (ev->NumReferences() <= 0)
00092 {
00093 delete ev;
00094 }
00095 }
00096 }
00097
00098 inline void SimpleDispatcher::Imp::EVList::moveFrontTo(EVList &b)
00099 {
00100 assert(size() > 0);
00101 Event *ev = front();
00102 pop_front();
00103 b.push_back(ev);
00104 }
00105
00106
00107 inline Event *SimpleDispatcher::Imp::EVList::noref_pop_front()
00108 {
00109 assert(size() > 0);
00110 Event *ev = front();
00111 pop_front();
00112 return ev;
00113 }
00114
00115 SimpleDispatcher::SimpleDispatcher()
00116 : imp_(*(new Imp)), stop_flag_(false)
00117 {
00118 imp_.internalevnum_ = 0;
00119 }
00120
00121 SimpleDispatcher::~SimpleDispatcher()
00122 {
00123 #ifndef NDEBUG
00124 if (!imp_.mainq_.qEmpty()) {
00125 std::cerr << "Warning!\a Deleting a SimpleDispatcher that isn't empty!\n";
00126 }
00127 #endif
00128 delete &imp_;
00129 }
00130
00131 void SimpleDispatcher::addEvent(const EventPtr &ev)
00132 {
00133 imp_.mainq_.addElement(ev.GetPtr());
00134 }
00135
00136 void SimpleDispatcher::addBusyPollEvent(const EventPtr &ev)
00137 {
00138 imp_.busypoll_.addElement(ev.GetPtr());
00139 }
00140
00141 inline void SimpleDispatcher::i_DispatchEvent(Imp &imp,
00142 Dispatcher *enclosing)
00143 {
00144 assert(!imp.mainq_.qEmpty());
00145 assert(enclosing != 0);
00146
00147 Event *ev = 0;
00148
00149 {
00150 void *tmp = interruptBlock();
00151 if (imp_.interrupted_)
00152 {
00153 imp_.interrupted_ = 0;
00154 ev = imp_.oninterrupt_.GetPtr();
00155 if (ev)
00156 {
00157 ev->AddReference();
00158 imp_.oninterrupt_.ReleasePtr();
00159 }
00160 }
00161 if (!ev)
00162 {
00163 ev = imp.mainq_.noref_pop_front();
00164 }
00165 imp_.curevt_ = ev;
00166 interruptUnblock(tmp);
00167 }
00168 ev->triggerEvent(enclosing);
00169 {
00170 void *tmp = interruptBlock();
00171 imp_.curevt_ = 0;
00172 interruptUnblock(tmp);
00173 }
00174 if (ev->NumReferences() > 0)
00175 {
00176 ev->DelReference();
00177 }
00178 if (ev->NumReferences() <= 0)
00179 {
00180 delete ev;
00181 }
00182 }
00183
00184 unsigned int SimpleDispatcher::i_dispatchNEvents(unsigned int n,
00185 bool checkbusypoll,
00186 Dispatcher *enclosing)
00187 {
00188 assert(imp_.mainq_.size() >= n);
00189 assert(enclosing != 0);
00190
00191 Imp &imp = imp_;
00192 unsigned int i = n;
00193 for (; (i > 0) && !stop_flag_; --i)
00194 {
00195 assert(!imp.mainq_.qEmpty() && imp.internalevnum_ > 0);
00196 i_DispatchEvent(imp, enclosing);
00197 if (checkbusypoll && ! imp.busypoll_.qEmpty())
00198 {
00199 break;
00200 }
00201 }
00202 return n - i;
00203 }
00204
00205 inline unsigned int
00206 SimpleDispatcher::checkEmptyBusy(Imp &imp, bool &checkbusy)
00207 {
00208 assert(imp.internalevnum_ <= imp.mainq_.size());
00209 unsigned int retval = imp.internalevnum_;
00210 checkbusy = false;
00211 if (imp.internalevnum_ == 0)
00212 {
00213 if (imp.mainq_.qEmpty())
00214 {
00215 if (imp.onempty_)
00216 {
00217
00218 imp.mainq_.addElement(imp.onempty_.GetPtr());
00219 imp.internalevnum_ = 1;
00220 imp.onempty_.ReleasePtr();
00221 retval = 1;
00222 }
00223 }
00224 else
00225 {
00226 if (! imp.busypoll_.qEmpty())
00227 {
00228
00229
00230 imp.busypoll_.moveFrontTo(imp.mainq_);
00231 imp.internalevnum_ = imp.mainq_.size();
00232 }
00233 else
00234 {
00235
00236
00237 imp.internalevnum_ = 1;
00238 checkbusy = true;
00239 }
00240 }
00241 retval = imp.mainq_.size();
00242 }
00243 return retval;
00244 }
00245
00246 void SimpleDispatcher::dispatchNEvents(unsigned int numevents,
00247 Dispatcher *enclosing)
00248 {
00249 assert(numevents > 0);
00250 assert(enclosing != 0);
00251
00252 Imp &imp = imp_;
00253
00254 while (numevents > 0 && !stop_flag_)
00255 {
00256 bool checkbusy = false;
00257 unsigned int evchunk = checkEmptyBusy(imp, checkbusy);
00258 if (evchunk <= 0)
00259 {
00260 break;
00261 }
00262 else
00263 {
00264 if (evchunk > numevents)
00265 {
00266 evchunk = numevents;
00267 }
00268 unsigned int dispatched = i_dispatchNEvents(evchunk, checkbusy,
00269 enclosing);
00270 if (dispatched > imp.internalevnum_)
00271 {
00272 imp.internalevnum_ = 0;
00273 }
00274 else
00275 {
00276 imp.internalevnum_ -= dispatched;
00277 }
00278 numevents -= dispatched;
00279 }
00280 }
00281 }
00282
00283 void SimpleDispatcher::dispatchEvents(unsigned int numevents,
00284 Dispatcher *enclosing)
00285 {
00286 if (numevents <= 0)
00287 {
00288 return;
00289 }
00290 else
00291 {
00292 if (enclosing == 0)
00293 {
00294 enclosing = this;
00295 }
00296 dispatchNEvents(numevents, enclosing);
00297 }
00298 if (stop_flag_)
00299 {
00300 stop_flag_ = false;
00301 }
00302 }
00303
00304 void SimpleDispatcher::dispatchUntilEmpty(Dispatcher *enclosing)
00305 {
00306 if (enclosing == 0)
00307 {
00308 enclosing = this;
00309 }
00310 while (!stop_flag_ && ((imp_.mainq_.size() > 0) || imp_.onempty_))
00311 {
00312 dispatchNEvents(imp_.mainq_.size() * 2 + 1, enclosing);
00313 }
00314 if (stop_flag_)
00315 {
00316 stop_flag_ = false;
00317 }
00318 }
00319
00320 bool SimpleDispatcher::isQueueEmpty() const
00321 {
00322 return(imp_.mainq_.qEmpty());
00323 }
00324
00325 bool SimpleDispatcher::onQueueEmpty(const EventPtr &ev)
00326 {
00327 if (imp_.onempty_)
00328 {
00329 return false;
00330 }
00331 else
00332 {
00333 imp_.onempty_ = ev;
00334 return true;
00335 }
00336 }
00337
00338 bool SimpleDispatcher::onInterrupt(const EventPtr &ev)
00339 {
00340 if (imp_.oninterrupt_)
00341 {
00342 return false;
00343 }
00344 else
00345 {
00346 imp_.oninterrupt_ = ev;
00347 return true;
00348 }
00349 }
00350
00351
00352
00353
00354 void SimpleDispatcher::interrupt()
00355 {
00356
00357 if (!imp_.interrupted_)
00358 {
00359
00360 void *tmp = interruptBlock();
00361 if (!imp_.interrupted_)
00362 {
00363 imp_.interrupted_ = 1;
00364 if (imp_.curevt_)
00365 {
00366 imp_.curevt_->interrupt();
00367 }
00368 }
00369 interruptUnblock(tmp);
00370 }
00371 }
00372
00373 namespace {
00374 void *interruptBlock()
00375 {
00376 sigset_t *oldset = new sigset_t;
00377 sigset_t newset;
00378 ::sigfillset(&newset);
00379 ::sigprocmask(SIG_BLOCK, &newset, oldset);
00380 return oldset;
00381 }
00382
00383 void interruptUnblock(void *data)
00384 {
00385 sigset_t *origset = static_cast<sigset_t *>(data);
00386 ::sigprocmask(SIG_SETMASK, origset, NULL);
00387 delete origset;
00388 }
00389 }
00390
00391 }
00392 }