Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members  

StreamFDModule.cxx

00001 /*
00002  * Copyright (C) 1991-9 Eric M. Hopper <hopper@omnifarious.mn.org>
00003  * 
00004  *     This program is free software; you can redistribute it and/or modify it
00005  *     under the terms of the GNU Lesser General Public License as published
00006  *     by the Free Software Foundation; either version 2 of the License, or
00007  *     (at your option) any later version.
00008  * 
00009  *     This program is distributed in the hope that it will be useful, but
00010  *     WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00012  *     Lesser General Public License for more details.
00013  * 
00014  *     You should have received a copy of the GNU Lesser General Public
00015  *     License along with this program; if not, write to the Free Software
00016  *     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
00017  */
00018 
00019 /* $Header: /home/hopper/src/cvs/C++/StrMod/StreamFDModule.cxx,v 1.35 2002/11/25 05:40:05 hopper Exp $ */
00020 
00021 // For a log, see ChangeLog
00022 
00023 #ifdef __GNUG__
00024 #  pragma implementation "StreamFDModule.h"
00025 #endif
00026 
00027 #include "StrMod/StreamFDModule.h"
00028 #ifndef _STR_DynamicBuffer_H_
00029 #   include "StrMod/DynamicBuffer.h"
00030 #endif
00031 #ifndef _STR_EOFStrChunk_H_
00032 #   include "StrMod/EOFStrChunk.h"
00033 #endif
00034 #ifndef _STR_UseTrackingVisitor_H_
00035 #   include "StrMod/UseTrackingVisitor.h"
00036 #endif
00037 
00038 #include <UniEvent/UNIXError.h>
00039 #include <UniEvent/EventPtr.h>
00040 #include <UniEvent/Event.h>
00041 #include <UniEvent/Dispatcher.h>
00042 
00043 #include <LCore/enum_set.h>
00044 
00045 #include <vector>
00046 #include <new>
00047 #include <unistd.h>  // read  (and maybe sysconf)
00048 #include <sys/types.h>  // writev and struct iovec
00049 #include <sys/uio.h>    // writev and struct iovec
00050 #include <sys/stat.h>
00051 #include <cassert>
00052 #include <cstddef>  // NULL
00053 #include <cerrno> // ESUCCESS
00054 #ifndef ESUCCESS
00055 #  define ESUCCESS 0
00056 #endif
00057 
00058 namespace strmod {
00059 namespace strmod {
00060 
00061 typedef struct stat statbuf_t;
00062 using unievent::Dispatcher;
00063 using unievent::UnixEventRegistry;
00064 using unievent::UNIXError;
00065 using lcore::LCoreError;
00066 
00067 const STR_ClassIdent StreamFDModule::identifier(8UL);
00068 const STR_ClassIdent StreamFDModule::FPlug::identifier(9UL);
00069 
00070 /** \class StreamFDModule::EvMixin
00071  * \brief A parent class for StreamFDModule events that provides some behavior
00072  * for the module being deleted.
00073  */
00074 class StreamFDModule::EvMixin {
00075  public:
00076    /** It's a constructor.
00077     * @param parent The StreamFDModule that this is an event for.
00078     */
00079    inline EvMixin(StreamFDModule &parent)
00080         : hasparent_(true), parent_(parent)
00081    {
00082    }
00083    //! Virtual because this is a mixin designed to have derived classes.
00084    virtual ~EvMixin()                           { assert(hasparent_ == false); }
00085 
00086    //! This method is called by StreamFDModule when it goes away.
00087    inline void parentGone()                     { hasparent_ = false; }
00088 
00089  protected:
00090    //! Call the right StreamFDModule function for the event.
00091    inline void triggerRead();
00092    //! Call the right StreamFDModule function for the event.
00093    inline void triggerWrite();
00094    //! Call the right StreamFDModule function for the event.
00095    inline void triggerError();
00096    //! Call the right StreamFDModule function for the event.
00097    inline void triggerResumeRead();
00098    //! Call the right StreamFDModule function for the event.
00099    inline void triggerResumeWrite();
00100 
00101  private:
00102    bool hasparent_;
00103    StreamFDModule &parent_;
00104 };
00105 
00106 inline void StreamFDModule::EvMixin::triggerRead()
00107 {
00108    // cerr << "In triggerRead\n";
00109    if (hasparent_)
00110    {
00111       parent_.eventRead();
00112    }
00113 }
00114 
00115 inline void StreamFDModule::EvMixin::triggerWrite()
00116 {
00117    // cerr << "In triggerWrite\n";
00118    if (hasparent_)
00119    {
00120       parent_.eventWrite();
00121    }
00122 }
00123 
00124 inline void StreamFDModule::EvMixin::triggerError()
00125 {
00126    // cerr << "In triggerError\n";
00127    if (hasparent_)
00128    {
00129       parent_.eventError();
00130    }
00131 }
00132 
00133 inline void StreamFDModule::EvMixin::triggerResumeRead()
00134 {
00135    if (hasparent_)
00136    {
00137       parent_.eventResumeRead();
00138    }
00139 }
00140 
00141 inline void StreamFDModule::EvMixin::triggerResumeWrite()
00142 {
00143    if (hasparent_)
00144    {
00145       parent_.eventResumeWrite();
00146    }
00147 }
00148 
00149 //! This is one of the five helper classes for StreamFDModule::EvMixin
00150 class StreamFDModule::FDPollRdEv
00151    : public StreamFDModule::EvMixin, public unievent::Event
00152 {
00153  public:
00154    inline FDPollRdEv(StreamFDModule &parent) : EvMixin(parent)  { }
00155    virtual ~FDPollRdEv()                                        { }
00156 
00157    virtual void triggerEvent(Dispatcher *dispatcher = 0) {
00158       triggerRead();
00159    }
00160 };
00161 
00162 //! This is one of the five helper classes for StreamFDModule::EvMixin
00163 class StreamFDModule::FDPollWrEv
00164    : public StreamFDModule::EvMixin, public unievent::Event
00165 {
00166  public:
00167    inline FDPollWrEv(StreamFDModule &parent) : EvMixin(parent)  { }
00168    virtual ~FDPollWrEv()                                        { }
00169 
00170    virtual void triggerEvent(Dispatcher *dispatcher = 0) {
00171       triggerWrite();
00172    }
00173 };
00174 
00175 //! This is one of the five helper classes for StreamFDModule::EvMixin
00176 class StreamFDModule::FDPollErEv
00177    : public StreamFDModule::EvMixin, public unievent::Event
00178 {
00179  public:
00180    inline FDPollErEv(StreamFDModule &parent) : EvMixin(parent)  { }
00181    virtual ~FDPollErEv()                                        { }
00182 
00183    virtual void triggerEvent(Dispatcher *dispatcher = 0) {
00184       triggerError();
00185    }
00186 };
00187 
00188 //! This is one of the five helper classes for StreamFDModule::EvMixin
00189 class StreamFDModule::ResumeReadEv
00190    : public StreamFDModule::EvMixin, public unievent::Event
00191 {
00192  public:
00193    ResumeReadEv(StreamFDModule &parent)
00194         : EvMixin(parent)
00195    {
00196    }
00197    virtual ~ResumeReadEv()                            { }
00198 
00199    virtual void triggerEvent(Dispatcher *dispatcher = 0)
00200    {
00201       triggerResumeRead();
00202    }
00203 };
00204 
00205 //! This is one of the five helper classes for StreamFDModule::EvMixin
00206 class StreamFDModule::ResumeWriteEv
00207    : public StreamFDModule::EvMixin, public unievent::Event
00208 {
00209  public:
00210    ResumeWriteEv(StreamFDModule &parent)
00211         : EvMixin(parent)
00212    {
00213    }
00214    virtual ~ResumeWriteEv()                            { }
00215 
00216    virtual void triggerEvent(Dispatcher *dispatcher = 0)
00217    {
00218       triggerResumeWrite();
00219    }
00220 };
00221 
00222 /**
00223  * The ChunkVisitor that gathers data for the writev calls.
00224  */
00225 class StreamFDModule::BufferList : public UseTrackingVisitor {
00226  public:
00227    //! ChunkVisitors never have very interesting constructors
00228    // Do ignore zeros though.  When iterating over data, zero length chunks
00229    // and data extents are pointless.
00230    inline BufferList();
00231    //! And rarely interesting destructors either.
00232    virtual ~BufferList()                    { }
00233 
00234    /**
00235     * Visit the StrChunk DAG rooted at chunk, filling up iovecs_ with what I
00236     * find.
00237     *
00238     * @param chunk The StrChunk to visit.
00239     */
00240    void startChunk(const StrChunkPtr &chunk)
00241    {
00242       iovecs_.clear();
00243       curvecidx_ = 0;
00244       totalbytes_ = curbyte_ = 0;
00245       curchunk_ = chunk;
00246       try
00247       {
00248          startVisit(chunk);
00249       }
00250       catch (...)
00251       {
00252          curchunk_.ReleasePtr();
00253          throw;
00254       }
00255       if (iovecs_.size() <= 0)
00256       {
00257          curchunk_.ReleasePtr();
00258       }
00259       else
00260       {
00261          curvecidx_ = 0;
00262       }
00263    }
00264 
00265    //! How many bytes are left in the currently visited StrChunk?
00266    size_t bytesLeft() const              { return totalbytes_ - curbyte_; }
00267    //! What iovec should be passed to the writev call?
00268    inline const iovec *getIOVec() const  { return &iovecs_[curvecidx_]; }
00269    //! How many iovec structures are left?
00270    inline size_t numVecs() const         { return iovecs_.size() - curvecidx_; }
00271    //! Move forward by numbytes, setting it up so the other functions return the right values.
00272    void advanceBy(size_t numbytes);
00273 
00274  protected:
00275    /*!
00276     * I don't care about chunks, just data (because iovecs are all about data)
00277     * so do nothing when told about a chunk.
00278     */
00279    virtual void use_visitStrChunk(const StrChunkPtr &chunk,
00280                                   const LinearExtent &used)
00281       throw(halt_visitation)             { }
00282 
00283    /*!
00284     * Add this new chunk of data to our list.
00285     */
00286    virtual void use_visitDataBlock(const void *start, size_t len,
00287                                    const void *realstart, size_t reallen)
00288       throw(halt_visitation)
00289    {
00290       // Many routines depend on this if statement to ensure that there are no
00291       // zero length extents.
00292       if (len <= 0)
00293       {
00294          return;
00295       }
00296 
00297       iovec data = {const_cast<void *>(start), len};
00298 
00299       iovecs_.push_back(data);
00300       totalbytes_ += len;
00301    }
00302 
00303  private:
00304    typedef std::vector<iovec> iovecvec;
00305    StrChunkPtr curchunk_;
00306    size_t totalbytes_;
00307    size_t curbyte_;
00308    size_t curvecidx_;
00309    iovecvec iovecs_;
00310 
00311    // Private and left undefined on purpose.
00312    BufferList(const BufferList &b);
00313    void operator =(const BufferList &b);
00314 };
00315 
00316 inline StreamFDModule::BufferList::BufferList()
00317      : UseTrackingVisitor(true), totalbytes_(0), curbyte_(0), curvecidx_(0)
00318 {
00319 }
00320 
00321 inline void StreamFDModule::BufferList::advanceBy(size_t numbytes)
00322 {
00323    if (totalbytes_ <= 0)
00324    {
00325       assert(curbyte_ == 0);
00326       return;
00327    }
00328    if ((curbyte_ + numbytes) >= totalbytes_)
00329    {
00330       totalbytes_ = curbyte_ = 0;
00331       curchunk_.ReleasePtr();
00332       curvecidx_ = 0;
00333       iovecs_.clear();
00334       return;
00335    }
00336 
00337    for (size_t bytestomove = numbytes; bytestomove > 0; )
00338    {
00339       iovec &curvec = iovecs_[curvecidx_];
00340       if (curvec.iov_len <= bytestomove)
00341       {
00342          bytestomove -= curvec.iov_len;
00343          ++curvecidx_;
00344       }
00345       else
00346       {
00347          curvec.iov_base =
00348             static_cast<unsigned char *>(curvec.iov_base) + bytestomove;
00349          curvec.iov_len -= bytestomove;
00350          bytestomove = 0;
00351       }
00352    }
00353    curbyte_ += numbytes;
00354    assert(curvecidx_ < iovecs_.size());
00355 }
00356 
00357 //---
00358 
00359 struct StreamFDModule::ErrorInfo {
00360    unievent::EventPtr events_[(ErrFatal - ErrRead) + 1];
00361    unsigned char errdata_[(ErrFatal - ErrRead) + 1][sizeof(UNIXError)];
00362    ErrorSet used_;
00363 };
00364 
00365 //------
00366 
00367 bool StreamFDModule::hasErrorIn(ErrorType err) const throw ()
00368 {
00369    return errorinfo_.used_.test(err);
00370 }
00371 
00372 bool StreamFDModule::hasErrorIn(const ErrorSet &set) const throw ()
00373 {
00374    ErrorSet tmp = errorinfo_.used_;
00375    tmp &= set;
00376    return tmp.any();
00377 }
00378 
00379 void StreamFDModule::onErrorIn(ErrorType err,
00380                                const unievent::EventPtr &ev) throw()
00381 {
00382    errorinfo_.events_[err] = ev;
00383 }
00384 
00385 void StreamFDModule::resetErrorIn(ErrorType err) throw ()
00386 {
00387    if ((err != ErrFatal) && errorinfo_.used_.test(err))
00388    {
00389       reinterpret_cast<UNIXError *>(errorinfo_.errdata_[err])->~UNIXError();
00390       errorinfo_.used_.reset(err);
00391       if ((err == ErrGeneral) || (err == ErrRead))
00392       {
00393          if (! (errorinfo_.used_.test(ErrGeneral) ||
00394                 errorinfo_.used_.test(ErrRead) ||
00395                 errorinfo_.used_.test(ErrFatal)))
00396          {
00397             if ((fd_ >= 0) && !flags_.checkingrd)
00398             {
00399                if (!buffed_read_)
00400                {
00401                   static const UnixEventRegistry::FDCondSet
00402                      readset(UnixEventRegistry::FD_Readable);
00403                   ureg_.registerFDCond(fd_, readset, readev_);
00404                   flags_.checkingrd = true;
00405                }
00406                else
00407                {
00408                   disp_.addEvent(resumeread_);
00409                }
00410             }
00411          }
00412       }
00413       if ((err == ErrGeneral) || (err == ErrWrite))
00414       {
00415          if (! (errorinfo_.used_.test(ErrGeneral) ||
00416                 errorinfo_.used_.test(ErrWrite) ||
00417                 errorinfo_.used_.test(ErrFatal)))
00418          {
00419             if ((fd_ >= 0) && !flags_.checkingwr)
00420             {
00421                if (cur_write_)
00422                {
00423                   static const UnixEventRegistry::FDCondSet
00424                      writeset(UnixEventRegistry::FD_Writeable);
00425                   ureg_.registerFDCond(fd_, writeset, writeev_);
00426                   flags_.checkingwr = true;
00427                }
00428                else
00429                {
00430                   disp_.addEvent(resumewrite_);
00431                }
00432             }
00433          }
00434       }
00435    }
00436 }
00437 
00438 const UNIXError &StreamFDModule::getErrorIn(ErrorType err) const throw ()
00439 {
00440    if (! errorinfo_.used_.test(err))
00441    {
00442       return UNIXError::S_noerror;
00443    }
00444    else
00445    {
00446       return *(reinterpret_cast<UNIXError *>(errorinfo_.errdata_[err]));
00447    }
00448 }
00449 
00450 void StreamFDModule::setErrorIn(ErrorType err,
00451                                 const unievent::UNIXError &errval)
00452 {
00453    void *rawmem = errorinfo_.errdata_[err];
00454    if (errorinfo_.used_.test(err))
00455    {
00456       (reinterpret_cast<UNIXError *>(rawmem))->~UNIXError();
00457    }
00458    else
00459    {
00460       errorinfo_.used_.set(err);
00461    }
00462    new(rawmem) UNIXError(errval);
00463    if (errorinfo_.events_[err])
00464    {
00465       disp_.addEvent(errorinfo_.events_[err]);
00466       errorinfo_.events_[err].ReleasePtr();
00467    }
00468 }
00469 
00470 size_t StreamFDModule::getBestChunkSize() const
00471 {
00472    static const size_t default_size = 4096U;
00473    static const size_t smallest = 128U;
00474    static const size_t largest = (128U * 1024U);
00475 
00476    size_t retval = default_size;
00477    statbuf_t sbuf;
00478 
00479    if ((fstat(fd_, &sbuf) == 0) && (sbuf.st_blksize > 0))
00480    {
00481       size_t blksize = sbuf.st_blksize;  // Known positive
00482 
00483       if (blksize < smallest)
00484       {
00485          retval = smallest;
00486       }
00487       else if (blksize > largest)
00488       {
00489          retval = largest;
00490       }
00491       else
00492       {
00493          retval = blksize;
00494       }
00495    }
00496    return(retval);
00497 }
00498 
00499 const StrChunkPtr StreamFDModule::plugRead()
00500 {
00501    assert(buffed_read_);
00502 
00503    StrChunkPtr retval = buffed_read_;
00504    buffed_read_.ReleasePtr();
00505    doReadFD();
00506    read_since_read_posted_ += retval->Length();
00507    if (!buffed_read_ ||
00508        (read_since_read_posted_ >= S_max_bytes_without_dispatch))
00509    {
00510       setReadableFlagFor(&plug_, false);
00511       if (read_since_read_posted_ >= S_max_bytes_without_dispatch)
00512       {
00513          disp_.addEvent(resumeread_);
00514       }
00515    }
00516    return(retval);
00517 }
00518 
00519 void StreamFDModule::eventRead()
00520 {
00521    assert(!buffed_read_);
00522    flags_.checkingrd = false;
00523    read_since_read_posted_ = 0;
00524    if (!buffed_read_)
00525    {
00526       doReadFD();
00527    }
00528    if (buffed_read_)
00529    {
00530       setReadableFlagFor(&plug_, true);
00531    }
00532 }
00533 
00534 void StreamFDModule::eventResumeRead()
00535 {
00536    read_since_read_posted_ = 0;
00537    if (buffed_read_)
00538    {
00539       setReadableFlagFor(&plug_, true);
00540    }
00541 }
00542 
00543 void StreamFDModule::plugWrite(const StrChunkPtr &ptr)
00544 {
00545    assert(!cur_write_);
00546    assert(ptr);
00547    cur_write_ = ptr;
00548    doWriteFD();
00549    written_since_write_posted_ += ptr->Length();
00550    if (cur_write_ ||
00551        (written_since_write_posted_ > S_max_bytes_without_dispatch))
00552    {
00553       setWriteableFlagFor(&plug_, false);
00554       if (written_since_write_posted_ > S_max_bytes_without_dispatch)
00555       {
00556          disp_.addEvent(resumewrite_);
00557       }
00558    }
00559 }
00560 
00561 void StreamFDModule::eventWrite()
00562 {
00563    assert(cur_write_);
00564    flags_.checkingwr = false;
00565    written_since_write_posted_ = 0;
00566    if (cur_write_)
00567    {
00568       doWriteFD();
00569    }
00570    if (! (hasErrorIn(ErrWrite) ||
00571           hasErrorIn(ErrGeneral) ||
00572           hasErrorIn(ErrFatal)))
00573    {
00574       if ((fd_ >= 0) && !cur_write_)
00575       {
00576          setWriteableFlagFor(&plug_, true);
00577       }
00578    }
00579 }
00580 
00581 void StreamFDModule::eventResumeWrite()
00582 {
00583    written_since_write_posted_ = 0;
00584    if (!cur_write_)
00585    {
00586       setWriteableFlagFor(&plug_, true);
00587    }
00588 }
00589 
00590 void StreamFDModule::eventError()
00591 {
00592    setErrorIn(ErrFatal, UNIXError("<none>", EBADF,
00593                                   LCoreError("Bad FD",
00594                                              LCORE_GET_COMPILERINFO())));
00595 }
00596 
00597 void StreamFDModule::doReadFD()
00598 {
00599    // cerr << fd_ << ": In doReadFD\n";
00600    assert(!buffed_read_);
00601 
00602    if ((fd_ < 0) ||
00603        hasErrorIn(ErrRead) || hasErrorIn(ErrFatal) || hasErrorIn(ErrGeneral))
00604    {
00605       return;
00606    }
00607 
00608    ssize_t size = -1;
00609    int myerrno = ESUCCESS;
00610 
00611    {
00612       // A normal pointer offers a speed advantage, and we don't know whether
00613       // we want to set buffed_read until the read succeeds.
00614       const size_t maxsize = getMaxChunkSize();
00615       DynamicBuffer *dbchunk = new DynamicBuffer(maxsize);
00616 
00617       errno = 0;
00618       // cerr << "Reading...\n";
00619       size = ::read(fd_, dbchunk->getVoidP(), maxsize);
00620       // I may have an error, capture errno to make sure nothing happens to it.
00621       myerrno = errno;
00622       // cerr << fd_ << ": just read " << size << " bytes.\n";
00623 
00624       if (size > 0) {
00625          dbchunk->resize(size);
00626          buffed_read_ = dbchunk;
00627 //       cerr << fd_ << ": just read: <";
00628 //       cerr.write(dbchunk->GetCharP(), dbchunk->Length());
00629 //       cerr << ">\n";
00630          dbchunk = 0;
00631       } else {
00632          delete dbchunk;
00633          dbchunk = 0;
00634       }
00635 
00636       assert(dbchunk == 0);
00637    }
00638 
00639    if (size <= 0)
00640    {
00641       // cerr << "Handling read error.\n";
00642       assert(!buffed_read_);
00643 
00644       if (size == 0)
00645       {
00646          setErrorIn(ErrRead,
00647                     UNIXError("read", LCoreError(LCORE_GET_COMPILERINFO())));
00648          // cerr << fd_ << ": read EOF\n";
00649          if (flags_.chunkeof)
00650          {
00651             buffed_read_ = new EOFStrChunk;
00652             // cerr << fd_ << ": sending EOF chunk\n";
00653          }
00654       }
00655       else  // size MUST be < 0, and so errno must also be set.
00656       {
00657          // myerrno was set up there right after the read call.
00658          if (myerrno == EAGAIN)
00659          {
00660             if (!flags_.checkingrd && (fd_ >= 0))
00661             {
00662                static const UnixEventRegistry::FDCondSet
00663                   readset(UnixEventRegistry::FD_Readable);
00664                ureg_.registerFDCond(fd_, readset, readev_);
00665                flags_.checkingrd = true;
00666             }
00667          }
00668          else // (myerrno != EAGAIN)
00669          {
00670             // EAGAIN just means I need to read later, so it's OK, but anything
00671             // else isn't.
00672             // cerr << "Handling non-EAGAIN error.\n";
00673             // cerr << fd_ << ": setting ErrRead to " << myerrno << "\n";
00674             setErrorIn(ErrRead,
00675                        UNIXError("read", myerrno,
00676                                  LCoreError(LCORE_GET_COMPILERINFO())));
00677          }
00678       }
00679    }
00680    else  // size > 0
00681    {
00682       assert(buffed_read_);
00683    }
00684 }
00685 
00686 void StreamFDModule::doWriteFD()
00687 {
00688    // cerr << fd_ << ": in doWriteFD()\n";
00689 #ifndef MAXIOVCNT  // UnixWare 7 has this undefined.  *sigh*
00690 #ifndef _SC_IOV_MAX  // Linux has this undefined.  *bigger sigh*
00691    static const unsigned int MAXIOVCNT = 16;
00692 #else
00693    static const unsigned int MAXIOVCNT = sysconf(_SC_IOV_MAX);
00694 #endif
00695 #endif
00696 
00697    assert(cur_write_);
00698 
00699    if (fd_ < 0)
00700    {
00701       return;
00702    }
00703 
00704    if (curbuflist_.bytesLeft() <= 0)
00705    {
00706       if (cur_write_->AreYouA(EOFStrChunk::identifier))
00707       {
00708          if (!flags_.eofwritten)
00709          {
00710             writeEOF();
00711             // an EOF besides this flag.
00712             flags_.eofwritten = 1;
00713          }
00714          return;
00715       }
00716       else
00717       {
00718          curbuflist_.startChunk(cur_write_);
00719       }
00720    }
00721 
00722    assert(curbuflist_.bytesLeft() > 0);
00723 
00724    // Save the value so compiler can do better optimization
00725    size_t length = curbuflist_.bytesLeft();
00726 
00727    // This loop is also broken out of when there's an error writing.
00728    while (length > 0)
00729    {
00730       int written;
00731       size_t numvecs = curbuflist_.numVecs();
00732       if (numvecs > MAXIOVCNT)
00733       {
00734          numvecs = MAXIOVCNT;
00735       }
00736       written = ::writev(fd_, curbuflist_.getIOVec(), numvecs);
00737       // cerr << fd_ << ": just wrote: <";
00738       // cerr.write(ioveclst.iov[0].iov_base, ioveclst.iov[0].iov_len);
00739       // cerr << ">\n";
00740       // Save errno for later to make sure it isn't clobbered.
00741       int myerrno = errno;
00742 
00743       if (written < 0)
00744       {
00745          if (myerrno == EAGAIN)
00746          {
00747             if (!flags_.checkingwr && (fd_ >= 0))
00748             {
00749                static const UnixEventRegistry::FDCondSet
00750                   writeset(UnixEventRegistry::FD_Writeable);
00751                ureg_.registerFDCond(fd_, writeset, writeev_);
00752                flags_.checkingwr = true;
00753             }
00754          }
00755          else
00756          {
00757             setErrorIn(ErrWrite,
00758                        UNIXError("write", myerrno,
00759                                  LCoreError(LCORE_GET_COMPILERINFO())));
00760          }
00761          break;
00762       }
00763       else
00764       {
00765          curbuflist_.advanceBy(written);
00766          length -= written;
00767       }
00768    }
00769 
00770    assert(length == curbuflist_.bytesLeft());
00771 
00772    if (length <= 0)
00773    {
00774       cur_write_.ReleasePtr();
00775    }
00776 }
00777 
00778 void StreamFDModule::writeEOF()
00779 {
00780    if (fd_ >= 0)
00781    {
00782       ::close(fd_);
00783       ureg_.freeFD(fd_);
00784       fd_ = -1;
00785       setErrorIn(ErrWrite,
00786                   UNIXError("write", LCoreError(LCORE_GET_COMPILERINFO())));
00787       {
00788          UNIXError tmp("write", EBADF, LCoreError(LCORE_GET_COMPILERINFO()));
00789          setErrorIn(ErrFatal, tmp);
00790          setErrorIn(ErrRead, tmp);
00791       }
00792    }
00793 //   cerr << fd_ << ": setting ErrRead to " << EBADF << "\n";
00794 //     setErrorIn(ErrRead, EBADF);
00795 //     setErrorIn(ErrWrite, EBADF);
00796 //     setErrorIn(ErrFatal, EBADF);
00797 }
00798 
00799 StreamFDModule::StreamFDModule(int fd, Dispatcher &disp,
00800                                UnixEventRegistry &ureg,
00801                                IOCheckFlags checkmask)
00802      : fd_(fd),
00803        plug_(*this),
00804        curbuflist_(*(new BufferList)),
00805        max_block_size_(4096),
00806        read_since_read_posted_(0),
00807        written_since_write_posted_(0),
00808        errorinfo_(*(new ErrorInfo)),
00809        disp_(disp),
00810        ureg_(ureg)
00811 {
00812    for (unsigned int i = 0;
00813         i < (sizeof(parenttrackers_) / sizeof(parenttrackers_[0]));
00814         ++i)
00815    {
00816       parenttrackers_[i] = 0;
00817    }
00818    setMaxToBest();
00819    flags_.plugmade = flags_.readeof = flags_.eofwritten = flags_.chunkeof = false;
00820 
00821    // This seems a bit odd, but if you claim to be checking read and write when
00822    // you really aren't, then the class will never actually register the event
00823    // with the poll manager that does the read and write checking.
00824    flags_.checkingrd = flags_.checkingwr = true;
00825 
00826    // Just here to limit the scope of some variables.
00827    {
00828       // Assignments done in a careful order to try to be exception safe.
00829       FDPollRdEv *readev = new FDPollRdEv(*this);
00830       readev_ = readev;
00831       FDPollWrEv *writeev = new FDPollWrEv(*this);
00832       writeev_ = writeev;
00833       FDPollErEv *errorev = new FDPollErEv(*this);
00834       errorev_ = errorev;
00835       ResumeReadEv *rread = new ResumeReadEv(*this);
00836       resumeread_ = rread;
00837       ResumeWriteEv *rwrite = new ResumeWriteEv(*this);
00838       resumewrite_ = rwrite;
00839 
00840       // Set up these pointers so there's a set of things to call the
00841       // 'parentGone' method on.
00842       parenttrackers_[0] = readev;
00843       parenttrackers_[1] = writeev;
00844       parenttrackers_[2] = errorev;
00845       parenttrackers_[3] = rread;
00846       parenttrackers_[4] = rwrite;
00847    }
00848 
00849    {
00850       UnixEventRegistry::FDCondSet
00851          allerrors(UnixEventRegistry::FD_Error,
00852                    UnixEventRegistry::FD_Closed,
00853                    UnixEventRegistry::FD_Invalid);
00854       ureg_.registerFDCond(fd_, allerrors, errorev_);
00855    }
00856    if (fd_ >= 0)
00857    {
00858       if ((checkmask == CheckBoth) || (checkmask == CheckRead))
00859       {
00860          flags_.checkingrd = false;
00861          doReadFD();
00862          if (buffed_read_)
00863          {
00864             setReadableFlagFor(&plug_, true);
00865          }
00866       }
00867       if ((checkmask == CheckBoth) || (checkmask == CheckWrite))
00868       {
00869          flags_.checkingwr = false;
00870          setWriteableFlagFor(&plug_, true);
00871       }
00872    }
00873 }
00874 
00875 StreamFDModule::~StreamFDModule()
00876 {
00877    for (unsigned int i = 0;
00878         i < (sizeof(parenttrackers_) / sizeof(parenttrackers_[0]));
00879         ++i)
00880    {
00881       if (parenttrackers_[i])
00882       {
00883          parenttrackers_[i]->parentGone();
00884          parenttrackers_[i] = 0;
00885       }
00886    }
00887    if (fd_ >= 0)
00888    {
00889       ::close(fd_);
00890       ureg_.freeFD(fd_);
00891       fd_ = -1;
00892    }
00893    for (int i = ErrRead; i <= ErrFatal; ++i)
00894    {
00895       ErrorType err = static_cast<ErrorType>(i);
00896       if (errorinfo_.used_[err])
00897       {
00898          (reinterpret_cast<UNIXError *>(errorinfo_.errdata_[i]))->~UNIXError();
00899          errorinfo_.used_.reset(err);
00900       }
00901    }
00902    delete &curbuflist_;
00903    delete &errorinfo_;
00904 }
00905 
00906 };  // End namespace strmod
00907 };  // End namespace strmod

Generated on Wed Jan 29 00:32:44 2003 for libNet by doxygen1.3-rc1