diff options
author | madmaxoft@gmail.com <madmaxoft@gmail.com@0a769ca7-a7f5-676a-18bf-c427514a06d6> | 2012-02-07 08:49:00 +0100 |
---|---|---|
committer | madmaxoft@gmail.com <madmaxoft@gmail.com@0a769ca7-a7f5-676a-18bf-c427514a06d6> | 2012-02-07 08:49:00 +0100 |
commit | 16feb0924e2b872d9c6f24d97647591ec36b63c7 (patch) | |
tree | e27e7fbbd3e283261ad97e80c722e53122b75df1 /source | |
parent | Slight cleanup (diff) | |
download | cuberite-16feb0924e2b872d9c6f24d97647591ec36b63c7.tar cuberite-16feb0924e2b872d9c6f24d97647591ec36b63c7.tar.gz cuberite-16feb0924e2b872d9c6f24d97647591ec36b63c7.tar.bz2 cuberite-16feb0924e2b872d9c6f24d97647591ec36b63c7.tar.lz cuberite-16feb0924e2b872d9c6f24d97647591ec36b63c7.tar.xz cuberite-16feb0924e2b872d9c6f24d97647591ec36b63c7.tar.zst cuberite-16feb0924e2b872d9c6f24d97647591ec36b63c7.zip |
Diffstat (limited to 'source')
-rw-r--r-- | source/cSocketThreads.cpp | 472 | ||||
-rw-r--r-- | source/cSocketThreads.h | 147 |
2 files changed, 619 insertions, 0 deletions
diff --git a/source/cSocketThreads.cpp b/source/cSocketThreads.cpp new file mode 100644 index 000000000..f5d007b66 --- /dev/null +++ b/source/cSocketThreads.cpp @@ -0,0 +1,472 @@ +
+// cSocketThreads.cpp
+
+// Implements the cSocketThreads class representing the heart of MCS's client networking.
+// This object takes care of network communication, groups sockets into threads and uses as little threads as possible for full read / write support
+// For more detail, see http://forum.mc-server.org/showthread.php?tid=327
+
+#include "Globals.h"
+#include "cSocketThreads.h"
+#include "cClientHandle.h"
+#include "packets/cPacket_RelativeEntityMoveLook.h"
+
+
+
+
+
+///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// cSocketThreads:
+
+cSocketThreads::cSocketThreads(void)
+{
+}
+
+
+
+
+
+cSocketThreads::~cSocketThreads()
+{
+ for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
+ {
+ delete *itr;
+ } // for itr - m_Threads[]
+ m_Threads.clear();
+}
+
+
+
+
+
+
+void cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client)
+{
+ // Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
+
+ // Try to add to existing threads:
+ cCSLock Lock(m_CS);
+ for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
+ {
+ if ((*itr)->HasEmptySlot())
+ {
+ (*itr)->AddClient(a_Socket, a_Client);
+ return;
+ }
+ }
+
+ // No thread has free space, create a new one:
+ cSocketThread * Thread = new cSocketThread(this);
+ Thread->Start();
+ Thread->AddClient(a_Socket, a_Client);
+ m_Threads.push_back(Thread);
+}
+
+
+
+
+
+void cSocketThreads::RemoveClient(cSocket * a_Socket)
+{
+ // Remove the socket (and associated client) from processing
+
+ cCSLock Lock(m_CS);
+ for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
+ {
+ if ((*itr)->RemoveSocket(a_Socket))
+ {
+ return;
+ }
+ }
+}
+
+
+
+
+
+void cSocketThreads::RemoveClient(cCallback * a_Client)
+{
+ // Remove the associated socket and the client from processing
+
+ cCSLock Lock(m_CS);
+ for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
+ {
+ if ((*itr)->RemoveClient(a_Client))
+ {
+ return;
+ }
+ }
+}
+
+
+
+
+
+void cSocketThreads::NotifyWrite(cCallback * a_Client)
+{
+ // Notifies the thread responsible for a_Client that the client has something to write
+
+ cCSLock Lock(m_CS);
+ for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
+ {
+ if ((*itr)->NotifyWrite(a_Client))
+ {
+ return;
+ }
+ }
+}
+
+
+
+
+
+////////////////////////////////////////////////////////////////////////////////
+// cSocketThreads::cSocketThread:
+
+cSocketThreads::cSocketThread::cSocketThread(cSocketThreads * a_Parent) :
+ cIsThread("cSocketThread"),
+ m_Parent(a_Parent),
+ m_NumSlots(0)
+{
+ // Nothing needed yet
+}
+
+
+
+
+
+void cSocketThreads::cSocketThread::AddClient(cSocket * a_Socket, cCallback * a_Client)
+{
+ assert(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding
+
+ m_Slots[m_NumSlots].m_Client = a_Client;
+ m_Slots[m_NumSlots].m_Socket = a_Socket;
+ m_Slots[m_NumSlots].m_Outgoing.clear();
+ m_NumSlots++;
+
+ // Notify the thread of the change:
+ assert(m_ControlSocket2.IsValid());
+ m_ControlSocket2.Send("a", 1);
+}
+
+
+
+
+
+bool cSocketThreads::cSocketThread::RemoveClient(cCallback * a_Client)
+{
+ // Returns true if removed, false if not found
+
+ if (m_NumSlots == 0)
+ {
+ return false;
+ }
+
+ for (int i = m_NumSlots - 1; i > 0 ; --i)
+ {
+ if (m_Slots[i].m_Client != a_Client)
+ {
+ continue;
+ }
+
+ // Found, remove it:
+ m_Slots[i] = m_Slots[m_NumSlots - 1];
+ m_NumSlots--;
+
+ // Notify the thread of the change:
+ assert(m_ControlSocket2.IsValid());
+ m_ControlSocket2.Send("r", 1);
+ return true;
+ } // for i - m_Slots[]
+
+ // Not found
+ return false;
+}
+
+
+
+
+
+bool cSocketThreads::cSocketThread::RemoveSocket(cSocket * a_Socket)
+{
+ // Returns true if removed, false if not found
+
+ if (m_NumSlots == 0)
+ {
+ return false;
+ }
+
+ for (int i = m_NumSlots - 1; i > 0 ; --i)
+ {
+ if (m_Slots[i].m_Socket != a_Socket)
+ {
+ continue;
+ }
+
+ // Found, remove it:
+ m_Slots[i] = m_Slots[m_NumSlots - 1];
+ m_NumSlots--;
+
+ // Notify the thread of the change:
+ assert(m_ControlSocket2.IsValid());
+ m_ControlSocket2.Send("r", 1);
+ return true;
+ } // for i - m_Slots[]
+
+ // Not found
+ return false;
+}
+
+
+
+
+
+bool cSocketThreads::cSocketThread::NotifyWrite(cCallback * a_Client)
+{
+ if (HasClient(a_Client))
+ {
+ // Notify the thread that there's another packet in the queue:
+ assert(m_ControlSocket2.IsValid());
+ m_ControlSocket2.Send("q", 1);
+ return true;
+ }
+ return false;
+}
+
+
+
+
+
+bool cSocketThreads::cSocketThread::HasClient(cCallback * a_Client) const
+{
+ for (int i = m_NumSlots - 1; i >= 0; --i)
+ {
+ if (m_Slots[i].m_Client == a_Client)
+ {
+ return true;
+ }
+ } // for i - m_Slots[]
+ return false;
+}
+
+
+
+
+
+bool cSocketThreads::cSocketThread::HasSocket(cSocket * a_Socket) const
+{
+ for (int i = m_NumSlots - 1; i >= 0; --i)
+ {
+ if (m_Slots[i].m_Socket == a_Socket)
+ {
+ return true;
+ }
+ } // for i - m_Slots[]
+ return false;
+}
+
+
+
+
+
+bool cSocketThreads::cSocketThread::Start(void)
+{
+ // Create the control socket listener
+ m_ControlSocket1 = cSocket::CreateSocket();
+ if (!m_ControlSocket1.IsValid())
+ {
+ LOGERROR("Cannot create a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
+ return false;
+ }
+ cSocket::SockAddr_In Addr;
+ Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
+ Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST;
+ Addr.Port = 0; // Any free port is okay
+ if (m_ControlSocket1.Bind(Addr) != 0)
+ {
+ LOGERROR("Cannot bind a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
+ m_ControlSocket1.CloseSocket();
+ return false;
+ }
+ if (m_ControlSocket1.GetPort() == 0)
+ {
+ LOGERROR("Cannot determine Control socket port (\"%s\"); conitnuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
+ m_ControlSocket1.CloseSocket();
+ return false;
+ }
+
+ // Start the thread
+ if (!super::Start())
+ {
+ m_ControlSocket1.CloseSocket();
+ return false;
+ }
+
+ // Finish connecting the control socket by accepting connection from the thread's socket
+ cSocket tmp = m_ControlSocket1.Accept();
+ if (!tmp.IsValid())
+ {
+ LOGERROR("Cannot link Control sockets for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
+ m_ControlSocket1.CloseSocket();
+ return false;
+ }
+ m_ControlSocket1.CloseSocket();
+ m_ControlSocket1 = tmp;
+
+ return true;
+}
+
+
+
+
+
+void cSocketThreads::cSocketThread::Execute(void)
+{
+ // Connect the "client" part of the Control socket:
+ cSocket::SockAddr_In Addr;
+ Addr.Family = cSocket::ADDRESS_FAMILY_INTERNET;
+ Addr.Address = cSocket::INTERNET_ADDRESS_LOCALHOST;
+ Addr.Port = m_ControlSocket1.GetPort();
+ assert(Addr.Port != 0); // We checked in the Start() method, but let's be sure
+ if (m_ControlSocket2.Connect(Addr) != 0)
+ {
+ LOGERROR("Cannot connect Control sockets for a cSocketThread (\"%s\"); continuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str());
+ m_ControlSocket1.CloseSocket();
+ return;
+ }
+
+ // The main thread loop:
+ while (!mShouldTerminate)
+ {
+ // Put all sockets into the Read set:
+ fd_set fdRead;
+ cSocket::xSocket Highest = 0;
+
+ PrepareSet(&fdRead, Highest);
+
+ // Wait for the sockets:
+ if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
+ {
+ LOGWARNING("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
+ break;
+ }
+
+ ReadFromSockets(&fdRead);
+
+ // Test sockets for writing:
+ fd_set fdWrite;
+ Highest = 0;
+ PrepareSet(&fdWrite, Highest);
+ timeval Timeout;
+ Timeout.tv_sec = 0;
+ Timeout.tv_usec = 0;
+ if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
+ {
+ LOGWARNING("select(W) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
+ break;
+ }
+
+ WriteToSockets(&fdWrite);
+ } // while (!mShouldTerminate)
+
+ LOG("cSocketThread %p is terminating", this);
+}
+
+
+
+
+
+void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest)
+{
+ FD_ZERO(a_Set);
+ FD_SET(m_ControlSocket1.GetSocket(), a_Set);
+
+ cCSLock Lock(m_Parent->m_CS);
+ for (int i = m_NumSlots - 1; i > 0; --i)
+ {
+ if (!m_Slots[i].m_Socket->IsValid())
+ {
+ continue;
+ }
+ cSocket::xSocket s = m_Slots[i].m_Socket->GetSocket();
+ FD_SET(s, a_Set);
+ if (s > a_Highest)
+ {
+ a_Highest = s;
+ }
+ } // for i - m_Slots[]
+}
+
+
+
+
+
+void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
+{
+ // Read on available sockets:
+ cCSLock Lock(m_Parent->m_CS);
+ for (int i = m_NumSlots - 1; i > 0; --i)
+ {
+ if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Read))
+ {
+ continue;
+ }
+ char Buffer[1024];
+ int Received = m_Slots[i].m_Socket->Receive(Buffer, ARRAYCOUNT(Buffer), 0);
+ if (Received == 0)
+ {
+ // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading
+ m_Slots[i].m_Socket->CloseSocket();
+ m_Slots[i].m_Client->SocketClosed();
+ }
+ else if (Received > 0)
+ {
+ m_Slots[i].m_Client->DataReceived(Buffer, Received);
+ }
+ else
+ {
+ // The socket has encountered an error, close it and let it be removed after we process all reading
+ m_Slots[i].m_Socket->CloseSocket();
+ m_Slots[i].m_Client->SocketClosed();
+ }
+ } // for i - m_Slots[]
+}
+
+
+
+
+
+void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
+{
+ cCSLock Lock(m_Parent->m_CS);
+ for (int i = m_NumSlots - 1; i > 0; --i)
+ {
+ if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Write))
+ {
+ continue;
+ }
+ if (m_Slots[i].m_Outgoing.empty())
+ {
+ // Request another chunk of outgoing data:
+ m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing);
+ if (m_Slots[i].m_Outgoing.empty())
+ {
+ // Nothing ready yet
+ continue;
+ }
+ } // if (outgoing data is empty)
+
+ int Sent = m_Slots[i].m_Socket->Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size());
+ if (Sent < 0)
+ {
+ LOGWARNING("Error while writing to client \"%s\", disconnecting", m_Slots[i].m_Socket->GetIPString().c_str());
+ m_Slots[i].m_Socket->CloseSocket();
+ m_Slots[i].m_Client->SocketClosed();
+ return;
+ }
+ m_Slots[i].m_Outgoing.erase(0, Sent);
+ }
+}
+
+
+
+
diff --git a/source/cSocketThreads.h b/source/cSocketThreads.h new file mode 100644 index 000000000..b43d693ba --- /dev/null +++ b/source/cSocketThreads.h @@ -0,0 +1,147 @@ +
+// cSocketThreads.h
+
+// Interfaces to the cSocketThreads class representing the heart of MCS's client networking.
+// This object takes care of network communication, groups sockets into threads and uses as little threads as possible for full read / write support
+// For more detail, see http://forum.mc-server.org/showthread.php?tid=327
+
+
+
+
+
+/// How many clients should one thread handle? (must be less than FD_SETSIZE - 1 for your platform)
+#define MAX_SLOTS 1
+
+
+
+
+
+#pragma once
+#ifndef CSOCKETTHREADS_H_INCLUDED
+#define CSOCKETTHREADS_H_INCLUDED
+
+#include "packets/cPacket.h"
+#include "cIsThread.h"
+
+
+
+
+// Check MAX_SLOTS:
+#if MAX_SLOTS >= FD_SETSIZE
+ #error "MAX_SLOTS must be less than FD_SETSIZE - 1 for your platform! (otherwise select() won't work)"
+#endif
+
+
+
+
+
+// fwd:
+class cSocket;
+class cClientHandle;
+
+
+
+
+
+class cSocketThreads
+{
+public:
+
+ // Clients of cSocketThreads must implement this interface to be able to communicate
+ class cCallback
+ {
+ public:
+ /// Called when data is received from the remote party
+ virtual void DataReceived(const char * a_Data, int a_Size) = 0;
+
+ /// Called when data can be sent to remote party; the function is supposed to append outgoing data to a_Data
+ virtual void GetOutgoingData(AString & a_Data) = 0;
+
+ /// Called when the socket has been closed for any reason
+ virtual void SocketClosed(void) = 0;
+ } ;
+
+
+ cSocketThreads(void);
+ ~cSocketThreads();
+
+ /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client
+ void AddClient(cSocket * a_Socket, cCallback * a_Client);
+
+ /// Remove the socket (and associated client) from processing
+ void RemoveClient(cSocket * a_Socket);
+
+ /// Remove the associated socket and the client from processing
+ void RemoveClient(cCallback * a_Client);
+
+ /// Notify the thread responsible for a_Client that the client has something to write
+ void NotifyWrite(cCallback * a_Client);
+
+private:
+
+ class cSocketThread :
+ public cIsThread
+ {
+ typedef cIsThread super;
+
+ public:
+
+ cSocketThread(cSocketThreads * a_Parent);
+
+ // All these methods assume parent's m_CS is locked
+ bool HasEmptySlot(void) const {return m_NumSlots < MAX_SLOTS; }
+ bool IsEmpty (void) const {return m_NumSlots == 0; }
+
+ void AddClient (cSocket * a_Socket, cCallback * a_Client);
+ bool RemoveClient(cCallback * a_Client); // Returns true if removed, false if not found
+ bool RemoveSocket(cSocket * a_Socket); // Returns true if removed, false if not found
+ bool HasClient (cCallback * a_Client) const;
+ bool HasSocket (cSocket * a_Socket) const;
+ bool NotifyWrite (cCallback * a_Client); // Returns true if client handled by this thread
+
+ bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket
+
+ private:
+
+ cSocketThreads * m_Parent;
+
+ // Two ends of the control socket, the first is select()-ed, the second is written to for notifications
+ cSocket m_ControlSocket1;
+ cSocket m_ControlSocket2;
+
+ // Socket-client-packetqueues triplets.
+ // Manipulation with these assumes that the parent's m_CS is locked
+ struct sSlot
+ {
+ cSocket * m_Socket;
+ cCallback * m_Client;
+ AString m_Outgoing; // If sending writes only partial data, the rest is stored here for another send
+ } ;
+ sSlot m_Slots[MAX_SLOTS];
+ int m_NumSlots; // Number of slots actually used
+
+ virtual void Execute(void) override;
+
+ void AddOrUpdatePacket(int a_Slot, cPacket * a_Packet); // Adds the packet to the specified slot, or updates an existing packet in that queue (EntityMoveLook filtering)
+
+ void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1
+ void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read
+ void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write
+ } ;
+
+ typedef std::list<cSocketThread *> cSocketThreadList;
+
+
+ cCriticalSection m_CS;
+ cSocketThreadList m_Threads;
+} ;
+
+
+
+
+
+#endif // CSOCKETTHREADS_H_INCLUDED
+
+
+
+
|