00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifdef __GNUG__
00024 # pragma implementation "RouterModule.h"
00025 #endif
00026
00027 #include "StrMod/RouterModule.h"
00028 #include <UniEvent/Dispatcher.h>
00029 #include <UniEvent/Event.h>
00030 #include <UniEvent/EventPtr.h>
00031 #include <algorithm>
00032
00033 namespace strmod {
00034 namespace strmod {
00035
00036 using lcore::ClassIdent;
00037
00038 const STR_ClassIdent RouterModule::identifier(52UL);
00039 const STR_ClassIdent RouterModule::RPlug::identifier(53UL);
00040
00041 using unievent::Dispatcher;
00042 using unievent::Event;
00043
00044 class RouterModule::ScanEvent : public Event {
00045 public:
00046 static const STR_ClassIdent identifier;
00047
00048
00049 ScanEvent(RouterModule &parent) : parent_(&parent) { }
00050
00051 inline virtual void triggerEvent(Dispatcher *dispatcher = 0);
00052
00053 void parentGone() { parent_ = 0; }
00054
00055 protected:
00056 virtual const ClassIdent *i_GetIdent() { return(&identifier); }
00057
00058 private:
00059 RouterModule *parent_;
00060 };
00061
00062 inline void RouterModule::ScanEvent::triggerEvent(Dispatcher *dispatcher)
00063 {
00064 if (parent_)
00065 {
00066 parent_->doScan();
00067 }
00068 }
00069
00070 const STR_ClassIdent RouterModule::ScanEvent::identifier(54UL);
00071
00072 RouterModule::RouterModule(Dispatcher &disp)
00073 : disp_(disp), scan_posted_(false), scan_(new ScanEvent(*this)),
00074 inroutingdone_(false), outgoingcopies_(0)
00075 {
00076 scan_->AddReference();
00077 }
00078
00079 RouterModule::~RouterModule()
00080 {
00081 scan_->parentGone();
00082 while (allplugs_.size() > 0) {
00083 RPlug *rplug = allplugs_.front();
00084
00085 allplugs_.pop_front();
00086 delete rplug;
00087 }
00088 assert(scan_->NumReferences() > 0);
00089 scan_->DelReference();
00090 if (scan_->NumReferences() <= 0)
00091 {
00092 delete scan_;
00093 }
00094 }
00095
00096 bool RouterModule::ownsPlug(const Plug *plug) const
00097 {
00098 const RPlugList::const_iterator end(allplugs_.end());
00099 const RPlugList::const_iterator search(find(allplugs_.begin(), end, plug));
00100 if (search != end)
00101 {
00102 return(true);
00103 }
00104 else
00105 {
00106 return(false);
00107 }
00108 }
00109
00110 bool RouterModule::deletePlug(Plug *plug)
00111 {
00112 const RPlugList &allplugs_l = allplugs_;
00113 const RPlugList::const_iterator end(allplugs_l.end());
00114 const RPlugList::const_iterator search(find(allplugs_l.begin(), end, plug));
00115 if ((search != end) && ! (*search)->getDeleted())
00116 {
00117 RPlug * const rp = *search;
00118 assert(rp == plug);
00119 rp->setDeleted();
00120 rp->unPlug();
00121 postScan();
00122 return(true);
00123 }
00124 else
00125 {
00126 return(false);
00127 }
00128 }
00129
00130 void RouterModule::addNewPlug(RPlug *rp)
00131 {
00132 assert(!ownsPlug(rp));
00133 assert(rp->pluggedInto() == NULL);
00134 assert(this == &(rp->getParent()));
00135 allplugs_.push_back(rp);
00136 if (routedchunk_)
00137 {
00138 setWriteableFlagFor(rp, false);
00139 nonwriteable_.push_back(rp);
00140 }
00141 else
00142 {
00143 setWriteableFlagFor(rp, true);
00144 writeable_.push_back(rp);
00145 }
00146 }
00147
00148 void RouterModule::doPost()
00149 {
00150 disp_.addEvent(scan_);
00151 }
00152
00153 void RouterModule::doScan()
00154 {
00155 assert(!inroutingdone_);
00156 scan_posted_ = false;
00157 writeable_.clear();
00158 nonwriteable_.clear();
00159 {
00160 const size_t size = allplugs_.size();
00161 for (size_t i = 0; i < size; ++i)
00162 {
00163 RPlug * const curplug = allplugs_.front();
00164 allplugs_.pop_front();
00165 if (curplug->getDeleted())
00166 {
00167 assert(curplug->pluggedInto() == NULL);
00168 delete curplug;
00169 }
00170 else
00171 {
00172 allplugs_.push_back(curplug);
00173 if (RPlug::getFlagsFrom(*curplug).canwrite_)
00174 {
00175 writeable_.push_back(curplug);
00176 }
00177 else
00178 {
00179 nonwriteable_.push_back(curplug);
00180 }
00181 }
00182 }
00183 }
00184 assert((outgoingcopies_ > 0 && routedchunk_ && writeable_.size() == 0) ||
00185 (outgoingcopies_ <= 0 && !routedchunk_));
00186 if (nonwriteable_.size() > 0)
00187 {
00188 routingDone();
00189 }
00190 }
00191
00192 void RouterModule::routingDone()
00193 {
00194 assert(outgoingcopies_ == 0 && !routedchunk_);
00195 if (inroutingdone_)
00196 {
00197 return;
00198 }
00199 inroutingdone_ = true;
00200 while (!routedchunk_ && nonwriteable_.size() > 0)
00201 {
00202 RPlug *towrite = nonwriteable_.front();
00203 nonwriteable_.pop_front();
00204 assert(! RPlug::getFlagsFrom(*towrite).canwrite_);
00205 writeable_.push_back(towrite);
00206 setWriteableFlagFor(towrite, true);
00207 }
00208 inroutingdone_ = false;
00209 }
00210
00211 void RouterModule::processIncoming(RPlug &source, const StrChunkPtr &chunk)
00212 {
00213 assert(outgoingcopies_ == 0 && !routedchunk_);
00214 assert(! source.getFlagsFrom(source).canwrite_);
00215 routedchunk_ = chunk;
00216 outgoingcopies_ = 1;
00217 {
00218 RPlugList dests;
00219 {
00220 const RPlugList &constall = allplugs_;
00221 RPlugAdder adder(dests);
00222 getDestinations(chunk, source, constall.begin(), constall.end(),
00223 adder);
00224 }
00225 outgoingcopies_ = dests.size();
00226 if (outgoingcopies_ >= 0)
00227 {
00228 while (writeable_.size() > 0)
00229 {
00230 RPlug *tononwrite = writeable_.front();
00231 writeable_.pop_front();
00232
00233
00234
00235 if ((tononwrite != &source) && ! tononwrite->getDeleted())
00236 {
00237 assert(RPlug::getFlagsFrom(*tononwrite).canwrite_);
00238 nonwriteable_.push_back(tononwrite);
00239 setWriteableFlagFor(tononwrite, false);
00240 }
00241 }
00242 {
00243 size_t skippedplugs = 0;
00244
00245 while (dests.size() > 0)
00246 {
00247 assert(routedchunk_);
00248 assert(outgoingcopies_ > 0);
00249 RPlug * const routedto = dests.front();
00250 dests.pop_front();
00251 assert(routedto->getDeleted() || ownsPlug(routedto));
00252 if (!routedto->getDeleted() && routedto->pluggedInto() != NULL)
00253 {
00254 setReadableFlagFor(routedto, true);
00255 }
00256 else
00257 {
00258 ++skippedplugs;
00259 }
00260 }
00261 if (skippedplugs > 0)
00262 {
00263 assert(outgoingcopies_ >= skippedplugs);
00264 assert(routedchunk_);
00265 if (skippedplugs < outgoingcopies_)
00266 {
00267 outgoingcopies_ -= skippedplugs;
00268 }
00269 else
00270 {
00271 outgoingcopies_ = 0;
00272 }
00273 if (outgoingcopies_ <= 0)
00274 {
00275 routedchunk_.ReleasePtr();
00276 }
00277 }
00278 }
00279 }
00280 else
00281 {
00282 routedchunk_.ReleasePtr();
00283 }
00284 }
00285 assert((outgoingcopies_ > 0 && routedchunk_) ||
00286 (outgoingcopies_ <= 0 && !routedchunk_));
00287 if (outgoingcopies_ == 0 && !inroutingdone_)
00288 {
00289 routingDone();
00290 }
00291 }
00292
00293 const StrChunkPtr RouterModule::RPlug::i_Read()
00294 {
00295 assert(getFlagsFrom(*this).canread_);
00296 assert(getFlagsFrom(*this).isreading_);
00297 RouterModule &parent = getParent();
00298 assert(parent.outgoingcopies_ > 0);
00299 assert(parent.routedchunk_);
00300 StrChunkPtr tmp = parent.routedchunk_;
00301 setReadable(false);
00302 if (--parent.outgoingcopies_ == 0)
00303 {
00304 parent.routedchunk_.ReleasePtr();
00305 if (!parent.inroutingdone_)
00306 {
00307 parent.routingDone();
00308 }
00309 }
00310 return(tmp);
00311 }
00312
00313 void RouterModule::RPlug::i_Write(const StrChunkPtr &ptr)
00314 {
00315 assert(getFlagsFrom(*this).canwrite_);
00316 assert(getFlagsFrom(*this).iswriting_);
00317 RouterModule &parent = getParent();
00318 assert(!parent.routedchunk_);
00319 setWriteable(false);
00320 parent.postScan();
00321 parent.processIncoming(*this, ptr);
00322 }
00323
00324 };
00325 };