From 6ccc4e36743485c50c58ec0f43d08181ba20958a Mon Sep 17 00:00:00 2001 From: "madmaxoft@gmail.com" Date: Thu, 27 Sep 2012 20:02:25 +0000 Subject: AnvilStats: initial project import, can do block and biome statistics git-svn-id: http://mc-server.googlecode.com/svn/trunk@895 0a769ca7-a7f5-676a-18bf-c427514a06d6 --- AnvilStats/Processor.cpp | 407 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 407 insertions(+) create mode 100644 AnvilStats/Processor.cpp (limited to 'AnvilStats/Processor.cpp') diff --git a/AnvilStats/Processor.cpp b/AnvilStats/Processor.cpp new file mode 100644 index 000000000..0d9bc4694 --- /dev/null +++ b/AnvilStats/Processor.cpp @@ -0,0 +1,407 @@ + +// Processor.cpp + +// Implements the cProcessor class representing the overall processor engine that manages threads, calls callbacks etc. + +#include "Globals.h" +#include "Processor.h" +#include "Callback.h" +#include "../source/WorldStorage/FastNBT.h" +#include "zlib.h" + + + + + +const int CHUNK_INFLATE_MAX = 1 MiB; +const int MAX_COMPRESSED_CHUNK_SIZE = 1 MiB; + + + + + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// cProcessor::cThread: + +cProcessor::cThread::cThread(cCallback & a_Callback, cProcessor & a_ParentProcessor) : + super("cProcessor::cThread"), + m_Callback(a_Callback), + m_ParentProcessor(a_ParentProcessor) +{ + super::Start(); +} + + + + + +void cProcessor::cThread::Execute(void) +{ + for (;;) + { + AString FileName = m_ParentProcessor.GetOneFileName(); + if (FileName.empty()) + { + // All done, terminate the thread + return; + } + ProcessFile(FileName); + } // for-ever +} + + + + + +void cProcessor::cThread::ProcessFile(const AString & a_FileName) +{ + LOG("Processing file \"%s\"", a_FileName.c_str()); + + size_t idx = a_FileName.rfind("r."); + if (idx == AString::npos) + { + LOG("Cannot parse filename \"%s\", skipping file.", a_FileName.c_str()); + return; + } + int RegionX = 0, RegionZ = 0; + if (sscanf_s(a_FileName.c_str() + idx, "r.%d.%d.mca", &RegionX, &RegionZ) != 2) + { + LOG("Cannot parse filename \"%s\" into coords, skipping file.", a_FileName.c_str()); + return; + } + + cFile f; + if (!f.Open(a_FileName, cFile::fmRead)) + { + LOG("Cannot open file \"%s\", skipping file.", a_FileName.c_str()); + return; + } + + int Header[2048]; + if (f.Read(Header, sizeof(Header)) != sizeof(Header)) + { + LOG("Cannot read header in file \"%s\", skipping file.", a_FileName.c_str()); + return; + } + + for (int i = 0; i < ARRAYCOUNT(Header); i++) + { + Header[i] = ntohl(Header[i]); + } + + int ChunkBaseX = RegionX * 32; + int ChunkBaseZ = RegionZ * 32; + for (int i = 0; i < 1024; i++) + { + unsigned Location = Header[i]; + unsigned Timestamp = Header[i + 1024]; + if ( + ((Location == 0) && (Timestamp == 0)) || // Official docs' "not present" + (Location >> 8 < 2) || // Logical - no chunk can start inside the header + ((Location & 0xff) == 0) // Logical - no chunk can be zero bytes + ) + { + // Chunk not present in the file + continue; + } + int ChunkX = ChunkBaseX + (i % 32); + int ChunkZ = ChunkBaseZ + (i / 32); + if (m_Callback.OnNewChunk(ChunkX, ChunkZ)) + { + continue; + } + ProcessChunk(f, ChunkX, ChunkZ, Location >> 8, Location & 0xff, Timestamp); + } // for i - chunk index +} + + + + + +void cProcessor::cThread::ProcessChunk(cFile & a_File, int a_ChunkX, int a_ChunkZ, unsigned a_SectorStart, unsigned a_SectorSize, unsigned a_TimeStamp) +{ + if (m_Callback.OnHeader(a_SectorStart * 4096, a_SectorSize, a_TimeStamp)) + { + return; + } + + if (a_File.Seek(a_SectorStart * 4096) < 0) + { + LOG("Seeking to sector %d failed, skipping chunk [%d, %d]", a_SectorStart, a_ChunkX, a_ChunkZ); + return; + } + + int ByteSize; + if (a_File.Read(&ByteSize, sizeof(ByteSize)) != sizeof(ByteSize)) + { + LOG("Cannot read bytesize at offset %d, skipping chunk [%d, %d].", a_SectorStart * 4096, a_ChunkX, a_ChunkZ); + return; + } + ByteSize = ntohl(ByteSize); + + char CompressionMethod; + if (a_File.Read(&CompressionMethod, sizeof(CompressionMethod)) != sizeof(CompressionMethod)) + { + LOG("Cannot read CompressionMethod at offset %d, skipping chunk [%d, %d].", a_SectorStart * 4096 + 4, a_ChunkX, a_ChunkZ); + return; + } + + if (m_Callback.OnCompressedDataSizePos(ByteSize, a_SectorStart * 4096 + 5, CompressionMethod)) + { + return; + } + + char CompressedData[MAX_COMPRESSED_CHUNK_SIZE]; + if (a_File.Read(CompressedData, ByteSize - 1) != ByteSize - 1) + { + LOG("Cannot read %d bytes of compressed data at offset %d, skipping chunk [%d, %d]", ByteSize - 1, a_SectorStart * 4096 + 5, a_ChunkX, a_ChunkZ); + return; + } + + ProcessCompressedChunkData(a_ChunkX, a_ChunkZ, CompressedData, ByteSize); +} + + + + + +void cProcessor::cThread::ProcessCompressedChunkData(int a_ChunkX, int a_ChunkZ, const char * a_CompressedData, int a_CompressedSize) +{ + char Decompressed[CHUNK_INFLATE_MAX]; + z_stream strm; + strm.zalloc = (alloc_func)NULL; + strm.zfree = (free_func)NULL; + strm.opaque = NULL; + inflateInit(&strm); + strm.next_out = (Bytef *)Decompressed; + strm.avail_out = sizeof(Decompressed); + strm.next_in = (Bytef *)a_CompressedData; + strm.avail_in = a_CompressedSize; + int res = inflate(&strm, Z_FINISH); + inflateEnd(&strm); + if (res != Z_STREAM_END) + { + LOG("Decompression failed, skipping chunk [%d, %d]", a_ChunkX, a_ChunkZ); + return; + } + + if (m_Callback.OnDecompressedData(Decompressed, strm.total_out)) + { + return; + } + + // Parse the NBT data: + cParsedNBT NBT(Decompressed, strm.total_out); + if (!NBT.IsValid()) + { + LOG("NBT Parsing failed, skipping chunk [%d, %d]", a_ChunkX, a_ChunkZ); + return; + } + + ProcessParsedChunkData(a_ChunkX, a_ChunkZ, NBT); +} + + + + + +void cProcessor::cThread::ProcessParsedChunkData(int a_ChunkX, int a_ChunkZ, cParsedNBT & a_NBT) +{ + int LevelTag = a_NBT.FindChildByName(0, "Level"); + if (LevelTag < 0) + { + LOG("Bad logical structure of the NBT, skipping chunk [%d, %d].", a_ChunkX, a_ChunkZ); + return; + } + int XPosTag = a_NBT.FindChildByName(LevelTag, "xPos"); + int ZPosTag = a_NBT.FindChildByName(LevelTag, "zPos"); + if ((XPosTag < 0) || (ZPosTag < 0)) + { + LOG("Pos tags missing in NTB, skipping chunk [%d, %d].", a_ChunkX, a_ChunkZ); + return; + } + if (m_Callback.OnRealCoords(a_NBT.GetInt(XPosTag), a_NBT.GetInt(ZPosTag))) + { + return; + } + + int LastUpdateTag = a_NBT.FindChildByName(LevelTag, "LastUpdate"); + if (LastUpdateTag > 0) + { + if (m_Callback.OnLastUpdate(a_NBT.GetLong(LastUpdateTag))) + { + return; + } + } + + int TerrainPopulatedTag = a_NBT.FindChildByName(LevelTag, "TerrainPopulated"); + bool TerrainPopulated = (TerrainPopulatedTag < 0) ? false : (a_NBT.GetByte(TerrainPopulatedTag) != 0); + if (m_Callback.OnTerrainPopulated(TerrainPopulated)) + { + return; + } + + int BiomesTag = a_NBT.FindChildByName(LevelTag, "Biomes"); + if (BiomesTag > 0) + { + if (m_Callback.OnBiomes((const unsigned char *)(a_NBT.GetData(BiomesTag)))) + { + return; + } + } + + int HeightMapTag = a_NBT.FindChildByName(LevelTag, "HeightMap"); + if (HeightMapTag > 0) + { + if (m_Callback.OnHeightMap((const int *)(a_NBT.GetData(HeightMapTag)))) + { + return; + } + } + + if (ProcessChunkSections(a_ChunkX, a_ChunkZ, a_NBT, LevelTag)) + { + return; + } + // TODO: entities, tile-entities etc. +} + + + + + +bool cProcessor::cThread::ProcessChunkSections(int a_ChunkX, int a_ChunkZ, cParsedNBT & a_NBT, int a_LevelTag) +{ + int Sections = a_NBT.FindChildByName(a_LevelTag, "Sections"); + if (Sections < 0) + { + return false; + } + + for (int Tag = a_NBT.GetFirstChild(Sections); Tag > 0; Tag = a_NBT.GetNextSibling(Tag)) + { + int YTag = a_NBT.FindChildByName(Tag, "Y"); + int BlocksTag = a_NBT.FindChildByName(Tag, "Blocks"); + int AddTag = a_NBT.FindChildByName(Tag, "Add"); + int DataTag = a_NBT.FindChildByName(Tag, "Data"); + int BlockLightTag = a_NBT.FindChildByName(Tag, "BlockLightTag"); + int SkyLightTag = a_NBT.FindChildByName(Tag, "SkyLight"); + + if ((YTag < 0) || (BlocksTag < 0) || (DataTag < 0)) + { + continue; + } + + if (m_Callback.OnSection( + a_NBT.GetByte(YTag), + (const BLOCKTYPE *) (a_NBT.GetData(BlocksTag)), + (AddTag > 0) ? (const NIBBLETYPE *)(a_NBT.GetData(AddTag)) : NULL, + (const NIBBLETYPE *)(a_NBT.GetData(DataTag)), + (BlockLightTag > 0) ? (const NIBBLETYPE *)(a_NBT.GetData(BlockLightTag)) : NULL, + (BlockLightTag > 0) ? (const NIBBLETYPE *)(a_NBT.GetData(BlockLightTag)) : NULL + )) + { + return true; + } + } // for Tag - Sections[] + + return false; +} + + + + + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// cProcessor: + +cProcessor::cProcessor(void) : + m_IsShuttingDown(false) +{ +} + + + + + +cProcessor::~cProcessor() +{ +} + + + + + +void cProcessor::ProcessWorld(const AString & a_WorldFolder, cCallbackFactory & a_CallbackFactory) +{ + PopulateFileQueue(a_WorldFolder); + + // Start as many threads as there are cores: + // Get number of cores by querying the system process affinity mask + DWORD Affinity, ProcAffinity; + GetProcessAffinityMask(GetCurrentProcess(), &ProcAffinity, &Affinity); + while (Affinity > 0) + { + if ((Affinity & 1) == 1) + { + cCallback * Callback = a_CallbackFactory.GetNewCallback(); + m_Threads.push_back(new cThread(*Callback, *this)); + } + Affinity >>= 1; + } // while (Affinity > 0) + if (m_Threads.size() == 0) + { + LOG("Zero cores detected - how am I running? Running in a single thread."); + cCallback * Callback = a_CallbackFactory.GetNewCallback(); + m_Threads.push_back(new cThread(*Callback, *this)); + } + + // Wait for all threads to finish + // simply by calling each thread's destructor sequentially + for (cThreads::iterator itr = m_Threads.begin(), end = m_Threads.end(); itr != end; ++itr) + { + delete *itr; + } // for itr - m_Threads[] +} + + + + + +void cProcessor::PopulateFileQueue(const AString & a_WorldFolder) +{ + LOG("Processing world in \"%s\"...", a_WorldFolder.c_str()); + + AString Path = a_WorldFolder; + Path.push_back(cFile::PathSeparator); + AStringList AllFiles = GetDirectoryContents(Path.c_str()); + for (AStringList::iterator itr = AllFiles.begin(), end = AllFiles.end(); itr != end; ++itr) + { + if (itr->rfind(".mca") != itr->length() - 4) + { + // Not a .mca file + continue; + } + m_FileQueue.push_back(Path + *itr); + } // for itr - AllFiles[] +} + + + + + +AString cProcessor::GetOneFileName(void) +{ + cCSLock Lock(m_CS); + if (m_FileQueue.empty()) + { + return ""; + } + AString res = m_FileQueue.back(); + m_FileQueue.pop_back(); + return res; +} + + + + -- cgit v1.2.3