diff options
Diffstat (limited to '')
-rw-r--r-- | source/OSSupport/SocketThreads.cpp | 99 |
1 files changed, 44 insertions, 55 deletions
diff --git a/source/OSSupport/SocketThreads.cpp b/source/OSSupport/SocketThreads.cpp index ae23b4496..dd961770d 100644 --- a/source/OSSupport/SocketThreads.cpp +++ b/source/OSSupport/SocketThreads.cpp @@ -37,7 +37,7 @@ cSocketThreads::~cSocketThreads() -bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) +bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client) { // Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client @@ -71,6 +71,7 @@ bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) +/* void cSocketThreads::RemoveClient(const cSocket * a_Socket) { // Remove the socket (and associated client) from processing @@ -87,6 +88,7 @@ void cSocketThreads::RemoveClient(const cSocket * a_Socket) // Cannot assert here, this may actually happen legally, since cClientHandle has to clean up the socket and it may have already closed in the meantime // ASSERT(!"Removing an unknown socket"); } +*/ @@ -133,20 +135,13 @@ void cSocketThreads::NotifyWrite(const cCallback * a_Client) -void cSocketThreads::Write(const cSocket * a_Socket, const AString & a_Data) +void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data) { - // Puts a_Data into outgoing data queue for a_Socket - - if (!a_Socket->IsValid()) - { - // Socket already closed, ignore the request - return; - } - + // Puts a_Data into outgoing data queue for a_Client cCSLock Lock(m_CS); for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) { - if ((*itr)->Write(a_Socket, a_Data)) + if ((*itr)->Write(a_Client, a_Data)) { return; } @@ -181,24 +176,20 @@ void cSocketThreads::StopReading(const cCallback * a_Client) /// Queues the socket for closing, as soon as its outgoing data is sent -void cSocketThreads::QueueClose(const cSocket * a_Socket) +void cSocketThreads::QueueClose(const cCallback * a_Client) { - if (!a_Socket->IsValid()) - { - // Already closed, ignore the request - return; - } + LOGD("QueueClose(client %p)", a_Client); cCSLock Lock(m_CS); for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) { - if ((*itr)->QueueClose(a_Socket)) + if ((*itr)->QueueClose(a_Client)) { return; } } // for itr - m_Threads[] - ASSERT(!"Queueing close of an unknown socket"); + ASSERT(!"Queueing close of an unknown client"); } @@ -240,13 +231,15 @@ cSocketThreads::cSocketThread::~cSocketThread() -void cSocketThreads::cSocketThread::AddClient(cSocket * a_Socket, cCallback * a_Client) +void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client) { ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; m_Slots[m_NumSlots].m_Outgoing.clear(); + m_Slots[m_NumSlots].m_ShouldClose = false; + m_Slots[m_NumSlots].m_ShouldCallClient = true; m_NumSlots++; // Notify the thread of the change: @@ -297,7 +290,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket) for (int i = m_NumSlots - 1; i >= 0 ; --i) { - if (m_Slots[i].m_Socket != a_Socket) + if (m_Slots[i].m_Socket != *a_Socket) { continue; } @@ -339,7 +332,7 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const { for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket->GetSocket() == a_Socket->GetSocket()) + if (m_Slots[i].m_Socket == *a_Socket) { return true; } @@ -367,12 +360,12 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) -bool cSocketThreads::cSocketThread::Write(const cSocket * a_Socket, const AString & a_Data) +bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data) { // Returns true if socket handled by this thread for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket == a_Socket) + if (m_Slots[i].m_Client == a_Client) { m_Slots[i].m_Outgoing.append(a_Data); @@ -397,13 +390,7 @@ bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client) { if (m_Slots[i].m_Client == a_Client) { - m_Slots[i].m_Client = NULL; - m_Slots[i].m_ShouldClose = false; - - // Notify the thread that there's a stop reading request: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("s", 1); - + m_Slots[i].m_ShouldCallClient = false; return true; } } // for i - m_Slots[] @@ -414,14 +401,13 @@ bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client) -bool cSocketThreads::cSocketThread::QueueClose(const cSocket * a_Socket) +bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client) { // Returns true if socket handled by this thread for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket == a_Socket) + if (m_Slots[i].m_Client == a_Client) { - ASSERT(m_Slots[i].m_Client == NULL); // Should have stopped reading first m_Slots[i].m_ShouldClose = true; // Notify the thread that there's a close queued (in case its conditions are already met): @@ -561,11 +547,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - if (!m_Slots[i].m_Socket->IsValid()) + if (!m_Slots[i].m_Socket.IsValid()) { continue; } - cSocket::xSocket s = m_Slots[i].m_Socket->GetSocket(); + cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); FD_SET(s, a_Set); if (s > a_Highest) { @@ -593,24 +579,24 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Read)) + if (!FD_ISSET(m_Slots[i].m_Socket.GetSocket(), a_Read)) { continue; } char Buffer[1024]; - int Received = m_Slots[i].m_Socket->Receive(Buffer, ARRAYCOUNT(Buffer), 0); + int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); if (Received == 0) { // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading - m_Slots[i].m_Socket->CloseSocket(); - if (m_Slots[i].m_Client != NULL) + m_Slots[i].m_Socket.CloseSocket(); + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->SocketClosed(); } } else if (Received > 0) { - if (m_Slots[i].m_Client != NULL) + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->DataReceived(Buffer, Received); } @@ -618,8 +604,8 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) else { // The socket has encountered an error, close it and let it be removed after we process all reading - m_Slots[i].m_Socket->CloseSocket(); - if (m_Slots[i].m_Client != NULL) + m_Slots[i].m_Socket.CloseSocket(); + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->SocketClosed(); } @@ -637,38 +623,41 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - cSocket Socket(*(m_Slots[i].m_Socket)); - if (!Socket.IsValid() || !FD_ISSET(Socket.GetSocket(), a_Write)) + if (!m_Slots[i].m_Socket.IsValid() || !FD_ISSET(m_Slots[i].m_Socket.GetSocket(), a_Write)) { continue; } if (m_Slots[i].m_Outgoing.empty()) { // Request another chunk of outgoing data: - if (m_Slots[i].m_Client != NULL) + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); } if (m_Slots[i].m_Outgoing.empty()) { // Nothing ready - if ((m_Slots[i].m_Client == NULL) && m_Slots[i].m_ShouldClose) + if (m_Slots[i].m_ShouldClose) { // Socket was queued for closing and there's no more data to send, close it now: - m_Slots[i].m_Socket->CloseSocket(); - m_Slots[i] = m_Slots[--m_NumSlots]; + + // DEBUG + LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket()); + + m_Slots[i].m_Socket.CloseSocket(); + // The slot must be freed actively by the client, using RemoveClient() } continue; } } // if (outgoing data is empty) - int Sent = m_Slots[i].m_Socket->Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); + int Sent = m_Slots[i].m_Socket.Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); if (Sent < 0) { int Err = cSocket::GetLastError(); - LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket->GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); - m_Slots[i].m_Socket->CloseSocket(); - if (m_Slots[i].m_Client != NULL) + LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); + m_Slots[i].m_Socket.CloseSocket(); + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->SocketClosed(); } @@ -695,12 +684,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) void cSocketThreads::cSocketThread::RemoveClosedSockets(void) { - // Removes sockets that have closed from m_Slots[] + // Removes sockets that have been queued for closing from m_Slots[] cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket->IsValid()) + if (!m_Slots[i].m_ShouldClose || !m_Slots[i].m_Outgoing.empty()) { continue; } |