CSocketMultiplexer.cpp

00001 /*
00002  * synergy -- mouse and keyboard sharing utility
00003  * Copyright (C) 2004 Chris Schoeneman
00004  * 
00005  * This package is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU General Public License
00007  * found in the file COPYING that should have accompanied this file.
00008  * 
00009  * This package is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  * GNU General Public License for more details.
00013  */
00014 
00015 #include "CSocketMultiplexer.h"
00016 #include "ISocketMultiplexerJob.h"
00017 #include "CCondVar.h"
00018 #include "CLock.h"
00019 #include "CMutex.h"
00020 #include "CThread.h"
00021 #include "CLog.h"
00022 #include "TMethodJob.h"
00023 #include "CArch.h"
00024 #include "XArch.h"
00025 #include "stdvector.h"
00026 
00027 //
00028 // CSocketMultiplexer
00029 //
00030 
00031 CSocketMultiplexer*     CSocketMultiplexer::s_instance = NULL;
00032 
00033 CSocketMultiplexer::CSocketMultiplexer() :
00034     m_mutex(new CMutex),
00035     m_thread(NULL),
00036     m_update(false),
00037     m_jobsReady(new CCondVar<bool>(m_mutex, false)),
00038     m_jobListLock(new CCondVar<bool>(m_mutex, false)),
00039     m_jobListLockLocked(new CCondVar<bool>(m_mutex, false)),
00040     m_jobListLocker(NULL),
00041     m_jobListLockLocker(NULL)
00042 {
00043     assert(s_instance == NULL);
00044 
00045     // this pointer just has to be unique and not NULL.  it will
00046     // never be dereferenced.  it's used to identify cursor nodes
00047     // in the jobs list.
00048     m_cursorMark = reinterpret_cast<ISocketMultiplexerJob*>(this);
00049 
00050     // start thread
00051     m_thread = new CThread(new TMethodJob<CSocketMultiplexer>(
00052                                 this, &CSocketMultiplexer::serviceThread));
00053 
00054     s_instance = this;
00055 }
00056 
00057 CSocketMultiplexer::~CSocketMultiplexer()
00058 {
00059     m_thread->cancel();
00060     m_thread->unblockPollSocket();
00061     m_thread->wait();
00062     delete m_thread;
00063     delete m_jobsReady;
00064     delete m_jobListLock;
00065     delete m_jobListLockLocked;
00066     delete m_jobListLocker;
00067     delete m_jobListLockLocker;
00068     delete m_mutex;
00069 
00070     // clean up jobs
00071     for (CSocketJobMap::iterator i = m_socketJobMap.begin();
00072                         i != m_socketJobMap.end(); ++i) {
00073         delete *(i->second);
00074     }
00075 
00076     s_instance = NULL;
00077 }
00078 
00079 CSocketMultiplexer*
00080 CSocketMultiplexer::getInstance()
00081 {
00082     return s_instance;
00083 }
00084 
00085 void
00086 CSocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
00087 {
00088     assert(socket != NULL);
00089     assert(job    != NULL);
00090 
00091     // prevent other threads from locking the job list
00092     lockJobListLock();
00093 
00094     // break thread out of poll
00095     m_thread->unblockPollSocket();
00096 
00097     // lock the job list
00098     lockJobList();
00099 
00100     // insert/replace job
00101     CSocketJobMap::iterator i = m_socketJobMap.find(socket);
00102     if (i == m_socketJobMap.end()) {
00103         // we *must* put the job at the end so the order of jobs in
00104         // the list continue to match the order of jobs in pfds in
00105         // serviceThread().
00106         CJobCursor j = m_socketJobs.insert(m_socketJobs.end(), job);
00107         m_update     = true;
00108         m_socketJobMap.insert(std::make_pair(socket, j));
00109     }
00110     else {
00111         CJobCursor j = i->second;
00112         if (*j != job) {
00113             delete *j;
00114             *j = job;
00115         }
00116         m_update = true;
00117     }
00118 
00119     // unlock the job list
00120     unlockJobList();
00121 }
00122 
00123 void
00124 CSocketMultiplexer::removeSocket(ISocket* socket)
00125 {
00126     assert(socket != NULL);
00127 
00128     // prevent other threads from locking the job list
00129     lockJobListLock();
00130 
00131     // break thread out of poll
00132     m_thread->unblockPollSocket();
00133 
00134     // lock the job list
00135     lockJobList();
00136 
00137     // remove job.  rather than removing it from the map we put NULL
00138     // in the list instead so the order of jobs in the list continues
00139     // to match the order of jobs in pfds in serviceThread().
00140     CSocketJobMap::iterator i = m_socketJobMap.find(socket);
00141     if (i != m_socketJobMap.end()) {
00142         if (*(i->second) != NULL) {
00143             delete *(i->second);
00144             *(i->second) = NULL;
00145             m_update     = true;
00146         }
00147     }
00148 
00149     // unlock the job list
00150     unlockJobList();
00151 }
00152 
00153 void
00154 CSocketMultiplexer::serviceThread(void*)
00155 {
00156     std::vector<IArchNetwork::CPollEntry> pfds;
00157     IArchNetwork::CPollEntry pfd;
00158 
00159     // service the connections
00160     for (;;) {
00161         CThread::testCancel();
00162 
00163         // wait until there are jobs to handle
00164         {
00165             CLock lock(m_mutex);
00166             while (!(bool)*m_jobsReady) {
00167                 m_jobsReady->wait();
00168             }
00169         }
00170 
00171         // lock the job list
00172         lockJobListLock();
00173         lockJobList();
00174 
00175         // collect poll entries
00176         if (m_update) {
00177             m_update = false;
00178             pfds.clear();
00179             pfds.reserve(m_socketJobMap.size());
00180 
00181             CJobCursor cursor    = newCursor();
00182             CJobCursor jobCursor = nextCursor(cursor);
00183             while (jobCursor != m_socketJobs.end()) {
00184                 ISocketMultiplexerJob* job = *jobCursor;
00185                 if (job != NULL) {
00186                     pfd.m_socket = job->getSocket();
00187                     pfd.m_events = 0;
00188                     if (job->isReadable()) {
00189                         pfd.m_events |= IArchNetwork::kPOLLIN;
00190                     }
00191                     if (job->isWritable()) {
00192                         pfd.m_events |= IArchNetwork::kPOLLOUT;
00193                     }
00194                     pfds.push_back(pfd);
00195                 }               
00196                 jobCursor = nextCursor(cursor);
00197             }
00198             deleteCursor(cursor);
00199         }
00200 
00201         int status;
00202         try {
00203             // check for status
00204             if (!pfds.empty()) {
00205                 status = ARCH->pollSocket(&pfds[0], pfds.size(), -1);
00206             }
00207             else {
00208                 status = 0;
00209             }
00210         }
00211         catch (XArchNetwork& e) {
00212             LOG((CLOG_WARN "error in socket multiplexer: %s", e.what().c_str()));
00213             status = 0;
00214         }
00215 
00216         if (status != 0) {
00217             // iterate over socket jobs, invoking each and saving the
00218             // new job.
00219             UInt32 i             = 0;
00220             CJobCursor cursor    = newCursor();
00221             CJobCursor jobCursor = nextCursor(cursor);
00222             while (i < pfds.size() && jobCursor != m_socketJobs.end()) {
00223                 if (*jobCursor != NULL) {
00224                     // get poll state
00225                     unsigned short revents = pfds[i].m_revents;
00226                     bool read  = ((revents & IArchNetwork::kPOLLIN) != 0);
00227                     bool write = ((revents & IArchNetwork::kPOLLOUT) != 0);
00228                     bool error = ((revents & (IArchNetwork::kPOLLERR |
00229                                               IArchNetwork::kPOLLNVAL)) != 0);
00230 
00231                     // run job
00232                     ISocketMultiplexerJob* job    = *jobCursor;
00233                     ISocketMultiplexerJob* newJob = job->run(read, write, error);
00234 
00235                     // save job, if different
00236                     if (newJob != job) {
00237                         CLock lock(m_mutex);
00238                         delete job;
00239                         *jobCursor = newJob;
00240                         m_update   = true;
00241                     }
00242                     ++i;
00243                 }
00244 
00245                 // next job
00246                 jobCursor = nextCursor(cursor);
00247             }
00248             deleteCursor(cursor);
00249         }
00250 
00251         // delete any removed socket jobs
00252         for (CSocketJobMap::iterator i = m_socketJobMap.begin();
00253                             i != m_socketJobMap.end();) {
00254             if (*(i->second) == NULL) {
00255                 m_socketJobMap.erase(i++);
00256                 m_update = true;
00257             }
00258             else {
00259                 ++i;
00260             }
00261         }
00262 
00263         // unlock the job list
00264         unlockJobList();
00265     }
00266 }
00267 
00268 CSocketMultiplexer::CJobCursor
00269 CSocketMultiplexer::newCursor()
00270 {
00271     CLock lock(m_mutex);
00272     return m_socketJobs.insert(m_socketJobs.begin(), m_cursorMark);
00273 }
00274 
00275 CSocketMultiplexer::CJobCursor
00276 CSocketMultiplexer::nextCursor(CJobCursor cursor)
00277 {
00278     CLock lock(m_mutex);
00279     CJobCursor j = m_socketJobs.end();
00280     CJobCursor i = cursor;
00281     while (++i != m_socketJobs.end()) {
00282         if (*i != m_cursorMark) {
00283             // found a real job (as opposed to a cursor)
00284             j = i;
00285 
00286             // move our cursor just past the job
00287             m_socketJobs.splice(++i, m_socketJobs, cursor);
00288             break;
00289         }
00290     }
00291     return j;
00292 }
00293 
00294 void
00295 CSocketMultiplexer::deleteCursor(CJobCursor cursor)
00296 {
00297     CLock lock(m_mutex);
00298     m_socketJobs.erase(cursor);
00299 }
00300 
00301 void
00302 CSocketMultiplexer::lockJobListLock()
00303 {
00304     CLock lock(m_mutex);
00305 
00306     // wait for the lock on the lock
00307     while (*m_jobListLockLocked) {
00308         m_jobListLockLocked->wait();
00309     }
00310 
00311     // take ownership of the lock on the lock
00312     *m_jobListLockLocked = true;
00313     m_jobListLockLocker  = new CThread(CThread::getCurrentThread());
00314 }
00315 
00316 void
00317 CSocketMultiplexer::lockJobList()
00318 {
00319     CLock lock(m_mutex);
00320 
00321     // make sure we're the one that called lockJobListLock()
00322     assert(*m_jobListLockLocker == CThread::getCurrentThread());
00323 
00324     // wait for the job list lock
00325     while (*m_jobListLock) {
00326         m_jobListLock->wait();
00327     }
00328 
00329     // take ownership of the lock
00330     *m_jobListLock      = true;
00331     m_jobListLocker     = m_jobListLockLocker;
00332     m_jobListLockLocker = NULL;
00333 
00334     // release the lock on the lock
00335     *m_jobListLockLocked = false;
00336     m_jobListLockLocked->broadcast();
00337 }
00338 
00339 void
00340 CSocketMultiplexer::unlockJobList()
00341 {
00342     CLock lock(m_mutex);
00343 
00344     // make sure we're the one that called lockJobList()
00345     assert(*m_jobListLocker == CThread::getCurrentThread());
00346 
00347     // release the lock
00348     delete m_jobListLocker;
00349     m_jobListLocker = NULL;
00350     *m_jobListLock  = false;
00351     m_jobListLock->signal();
00352 
00353     // set new jobs ready state
00354     bool isReady = !m_socketJobMap.empty();
00355     if (*m_jobsReady != isReady) {
00356         *m_jobsReady = isReady;
00357         m_jobsReady->signal();
00358     }
00359 }

Generated on Fri Nov 6 00:18:45 2009 for synergy-plus by  doxygen 1.4.7