diff options
| author | Even Rouault <even.rouault@spatialys.com> | 2019-12-20 19:10:20 +0100 |
|---|---|---|
| committer | Even Rouault <even.rouault@spatialys.com> | 2019-12-22 23:10:43 +0100 |
| commit | ed92965937856ae697010e1461e82f6245d19909 (patch) | |
| tree | 3f61bf24002b02d8f9c2804b79e93ab9f46a00a6 /src/filemanager.cpp | |
| parent | f73527fa20c9250cfced6aa73a7e46f40dc2c214 (diff) | |
| download | PROJ-ed92965937856ae697010e1461e82f6245d19909.tar.gz PROJ-ed92965937856ae697010e1461e82f6245d19909.zip | |
Network: add a memory cache and I/O chunking strategy
Diffstat (limited to 'src/filemanager.cpp')
| -rw-r--r-- | src/filemanager.cpp | 259 |
1 files changed, 243 insertions, 16 deletions
diff --git a/src/filemanager.cpp b/src/filemanager.cpp index 3a2acee4..2dfcd6b6 100644 --- a/src/filemanager.cpp +++ b/src/filemanager.cpp @@ -28,14 +28,34 @@ #ifndef FROM_PROJ_CPP #define FROM_PROJ_CPP #endif +#define LRU11_DO_NOT_DEFINE_OUT_OF_CLASS_METHODS #include <algorithm> +#include <functional> +#include <limits> #include "filemanager.hpp" #include "proj.h" #include "proj/internal/internal.hpp" +#include "proj/internal/lru_cache.hpp" #include "proj_internal.h" +#ifdef __MINGW32__ +// mingw32-win32 doesn't implement std::mutex +namespace { +class MyMutex { + public: + // cppcheck-suppress functionStatic + void lock() { pj_acquire_lock(); } + // cppcheck-suppress functionStatic + void unlock() { pj_release_lock(); } +}; +} +#else +#include <mutex> +#define MyMutex std::mutex +#endif + #ifdef CURL_ENABLED #include <curl/curl.h> #include <sqlite3.h> // for sqlite3_snprintf @@ -177,17 +197,97 @@ std::unique_ptr<File> FileLegacyAdapter::open(PJ_CONTEXT *ctx, // --------------------------------------------------------------------------- +constexpr size_t DOWNLOAD_CHUNK_SIZE = 16 * 1024; +constexpr int MAX_CHUNKS = 64; + +class NetworkChunkCache { + public: + void insert(const std::string &url, unsigned long long chunkIdx, + std::vector<unsigned char> &&data); + + std::shared_ptr<std::vector<unsigned char>> + get(const std::string &url, unsigned long long chunkIdx); + + void clear(); + + private: + struct Key { + std::string url; + unsigned long long chunkIdx; + + Key(const std::string &urlIn, unsigned long long chunkIdxIn) + : url(urlIn), chunkIdx(chunkIdxIn) {} + bool operator==(const Key &other) const { + return url == other.url && chunkIdx == other.chunkIdx; + } + }; + + struct KeyHasher { + std::size_t operator()(const Key &k) const { + return std::hash<std::string>{}(k.url) ^ + (std::hash<unsigned long long>{}(k.chunkIdx) << 1); + } + }; + + lru11::Cache< + Key, std::shared_ptr<std::vector<unsigned char>>, MyMutex, + std::unordered_map< + Key, + typename std::list<lru11::KeyValuePair< + Key, std::shared_ptr<std::vector<unsigned char>>>>::iterator, + KeyHasher>> + cache_{MAX_CHUNKS}; +}; + +// --------------------------------------------------------------------------- + +void NetworkChunkCache::insert(const std::string &url, + unsigned long long chunkIdx, + std::vector<unsigned char> &&data) { + cache_.insert( + Key(url, chunkIdx), + std::make_shared<std::vector<unsigned char>>(std::move(data))); +} + +// --------------------------------------------------------------------------- + +std::shared_ptr<std::vector<unsigned char>> +NetworkChunkCache::get(const std::string &url, unsigned long long chunkIdx) { + std::shared_ptr<std::vector<unsigned char>> ret; + cache_.tryGet(Key(url, chunkIdx), ret); + return ret; +} + +// --------------------------------------------------------------------------- + +void NetworkChunkCache::clear() +{ + cache_.clear(); +} + +// --------------------------------------------------------------------------- + +static NetworkChunkCache gNetworkChunkCache{}; + +// --------------------------------------------------------------------------- + class NetworkFile : public File { PJ_CONTEXT *m_ctx; + std::string m_url; PROJ_NETWORK_HANDLE *m_handle; unsigned long long m_pos = 0; + size_t m_nBlocksToDownload = 1; + unsigned long long m_lastDownloadedOffset; NetworkFile(const NetworkFile &) = delete; NetworkFile &operator=(const NetworkFile &) = delete; protected: - NetworkFile(PJ_CONTEXT *ctx, PROJ_NETWORK_HANDLE *handle) - : m_ctx(ctx), m_handle(handle) {} + NetworkFile(PJ_CONTEXT *ctx, const std::string &url, + PROJ_NETWORK_HANDLE *handle, + unsigned long long lastDownloadOffset) + : m_ctx(ctx), m_url(url), m_handle(handle), + m_lastDownloadedOffset(lastDownloadOffset) {} public: ~NetworkFile() override; @@ -202,20 +302,125 @@ class NetworkFile : public File { // --------------------------------------------------------------------------- std::unique_ptr<File> NetworkFile::open(PJ_CONTEXT *ctx, const char *filename) { - std::vector<unsigned char> buffer(16 * 1024); - size_t size_read = 0; - auto handle = ctx->networking.open(ctx, filename, buffer.size(), &buffer[0], - &size_read, ctx->networking.user_data); - return std::unique_ptr<File>(handle ? new NetworkFile(ctx, handle) - : nullptr); + if (gNetworkChunkCache.get(filename, 0)) { + return std::unique_ptr<File>( + new NetworkFile(ctx, filename, nullptr, + std::numeric_limits<unsigned long long>::max())); + } else { + std::vector<unsigned char> buffer(DOWNLOAD_CHUNK_SIZE); + size_t size_read = 0; + auto handle = + ctx->networking.open(ctx, filename, 0, buffer.size(), &buffer[0], + &size_read, ctx->networking.user_data); + buffer.resize(size_read); + gNetworkChunkCache.insert(filename, 0, std::move(buffer)); + return std::unique_ptr<File>( + handle ? new NetworkFile(ctx, filename, handle, size_read) + : nullptr); + } } // --------------------------------------------------------------------------- size_t NetworkFile::read(void *buffer, size_t sizeBytes) { - size_t nRead = m_ctx->networking.read_range( - m_ctx, m_handle, m_pos, sizeBytes, buffer, m_ctx->networking.user_data); - m_pos += nRead; + + if (sizeBytes == 0) + return 0; + + auto iterOffset = m_pos; + while (sizeBytes) { + const auto chunkIdxToDownload = iterOffset / DOWNLOAD_CHUNK_SIZE; + const auto offsetToDownload = chunkIdxToDownload * DOWNLOAD_CHUNK_SIZE; + std::vector<unsigned char> region; + auto pChunk = gNetworkChunkCache.get(m_url, chunkIdxToDownload); + if (pChunk != nullptr) { + region = *pChunk; + } else { + if (offsetToDownload == m_lastDownloadedOffset) { + // In case of consecutive reads (of small size), we use a + // heuristic that we will read the file sequentially, so + // we double the requested size to decrease the number of + // client/server roundtrips. + if (m_nBlocksToDownload < 100) + m_nBlocksToDownload *= 2; + } else { + // Random reads. Cancel the above heuristics. + m_nBlocksToDownload = 1; + } + + // Ensure that we will request at least the number of blocks + // to satisfy the remaining buffer size to read. + const auto endOffsetToDownload = + ((iterOffset + sizeBytes + DOWNLOAD_CHUNK_SIZE - 1) / + DOWNLOAD_CHUNK_SIZE) * + DOWNLOAD_CHUNK_SIZE; + const auto nMinBlocksToDownload = static_cast<size_t>( + (endOffsetToDownload - offsetToDownload) / DOWNLOAD_CHUNK_SIZE); + if (m_nBlocksToDownload < nMinBlocksToDownload) + m_nBlocksToDownload = nMinBlocksToDownload; + + // Avoid reading already cached data. + // Note: this might get evicted if concurrent reads are done, but + // this should not cause bugs. Just missed optimization. + for (size_t i = 1; i < m_nBlocksToDownload; i++) { + if (gNetworkChunkCache.get(m_url, chunkIdxToDownload + i) != + nullptr) { + m_nBlocksToDownload = i; + break; + } + } + + if (m_nBlocksToDownload > MAX_CHUNKS) + m_nBlocksToDownload = MAX_CHUNKS; + + region.resize(m_nBlocksToDownload * DOWNLOAD_CHUNK_SIZE); + size_t nRead = 0; + if (!m_handle) { + m_handle = m_ctx->networking.open( + m_ctx, m_url.c_str(), offsetToDownload, + m_nBlocksToDownload * DOWNLOAD_CHUNK_SIZE, ®ion[0], + &nRead, m_ctx->networking.user_data); + if (!m_handle) { + return 0; + } + } else { + nRead = m_ctx->networking.read_range( + m_ctx, m_handle, offsetToDownload, + m_nBlocksToDownload * DOWNLOAD_CHUNK_SIZE, ®ion[0], + m_ctx->networking.user_data); + } + if (nRead == 0) { + return 0; + } + region.resize(nRead); + m_lastDownloadedOffset = offsetToDownload + nRead; + + const auto nChunks = + (region.size() + DOWNLOAD_CHUNK_SIZE - 1) / DOWNLOAD_CHUNK_SIZE; + for (size_t i = 0; i < nChunks; i++) { + std::vector<unsigned char> chunk( + region.data() + i * DOWNLOAD_CHUNK_SIZE, + region.data() + + std::min((i + 1) * DOWNLOAD_CHUNK_SIZE, region.size())); + gNetworkChunkCache.insert(m_url, chunkIdxToDownload + i, + std::move(chunk)); + } + } + const size_t nToCopy = static_cast<size_t>( + std::min(static_cast<unsigned long long>(sizeBytes), + region.size() - (iterOffset - offsetToDownload))); + memcpy(buffer, region.data() + iterOffset - offsetToDownload, nToCopy); + buffer = static_cast<char *>(buffer) + nToCopy; + iterOffset += nToCopy; + sizeBytes -= nToCopy; + if (region.size() < static_cast<size_t>(DOWNLOAD_CHUNK_SIZE) && + sizeBytes != 0) { + break; + } + } + + size_t nRead = static_cast<size_t>(iterOffset - m_pos); + m_pos = iterOffset; return nRead; } @@ -229,6 +434,16 @@ bool NetworkFile::seek(unsigned long long offset, int whence) { } else { if (offset != 0) return false; + if (!m_handle) { + size_t nRead = 0; + char dummy; + m_handle = + m_ctx->networking.open(m_ctx, m_url.c_str(), 0, 1, &dummy, + &nRead, m_ctx->networking.user_data); + if (!m_handle) { + return false; + } + } const auto filesize = m_ctx->networking.get_file_size( m_ctx, m_handle, m_ctx->networking.user_data); if (filesize == 0) @@ -245,7 +460,9 @@ unsigned long long NetworkFile::tell() { return m_pos; } // --------------------------------------------------------------------------- NetworkFile::~NetworkFile() { - m_ctx->networking.close(m_ctx, m_handle, m_ctx->networking.user_data); + if (m_handle) { + m_ctx->networking.close(m_ctx, m_handle, m_ctx->networking.user_data); + } } // --------------------------------------------------------------------------- @@ -301,6 +518,7 @@ static size_t pj_curl_write_func(void *buffer, size_t count, size_t nmemb, // --------------------------------------------------------------------------- static PROJ_NETWORK_HANDLE *pj_curl_open(PJ_CONTEXT *, const char *url, + unsigned long long offset, size_t size_to_read, void *buffer, size_t *out_size_read, void *) { CURL *hCurlHandle = curl_easy_init(); @@ -326,7 +544,8 @@ static PROJ_NETWORK_HANDLE *pj_curl_open(PJ_CONTEXT *, const char *url, } char szBuffer[128]; - sqlite3_snprintf(sizeof(szBuffer), szBuffer, "0-%llu", size_to_read - 1); + sqlite3_snprintf(sizeof(szBuffer), szBuffer, "%llu-%llu", offset, + offset + size_to_read - 1); curl_easy_setopt(hCurlHandle, CURLOPT_RANGE, szBuffer); std::string headers; @@ -413,9 +632,10 @@ static size_t pj_curl_read_range(PJ_CONTEXT *, PROJ_NETWORK_HANDLE *raw_handle, static PROJ_NETWORK_HANDLE * no_op_network_open(PJ_CONTEXT *ctx, const char * /* url */, - size_t, /* size to read */ - void *, /* buffer to update with bytes read*/ - size_t *, /* output: size actually read */ + unsigned long long, /* offset */ + size_t, /* size to read */ + void *, /* buffer to update with bytes read*/ + size_t *, /* output: size actually read */ void * /*user_data*/) { pj_log(ctx, PJ_LOG_DEBUG_MAJOR, "Network functionality not available"); return nullptr; @@ -452,6 +672,13 @@ void FileManager::fillDefaultNetworkInterface(PJ_CONTEXT *ctx) { // --------------------------------------------------------------------------- +void FileManager::clearCache() +{ + gNetworkChunkCache.clear(); +} + +// --------------------------------------------------------------------------- + NS_PROJ_END //! @endcond |
