add core::uv::File

This commit is contained in:
falsycat 2023-08-26 11:45:59 +09:00
parent cd1d06ad6e
commit 177f0d66f1
4 changed files with 490 additions and 0 deletions

View File

@ -22,6 +22,8 @@ target_sources(nf7_core
uv/clock.hh
uv/concurrency.hh
uv/context.hh
uv/file.cc
uv/file.hh
uv/parallelism.hh
clock.hh
logger.hh
@ -39,6 +41,7 @@ target_sources(nf7_core_test
luajit/thread_test.hh
uv/context_test.hh
uv/concurrency_test.cc
uv/file_test.cc
uv/parallelism_test.cc
clock_test.cc
)

209
core/uv/file.cc Normal file
View File

@ -0,0 +1,209 @@
// No copyright
#include "core/uv/file.hh"
#include <cassert>
#include <format>
#include <limits>
#include <utility>
#include "iface/common/exception.hh"
#include "iface/common/void.hh"
#include "core/logger.hh"
namespace nf7::core::uv {
std::shared_ptr<File> File::Make(
Env& env, std::string_view path, uvw::file_req::file_open_flags flags) {
class A : public File {
public:
A(Env& env, std::string_view path, uvw::file_req::file_open_flags flags)
: File(env, path, flags) { }
};
auto self = std::make_shared<A>(env, path, flags);
std::weak_ptr<File> wself {self};
self->file_->on<uvw::fs_event>([wself](auto& e, auto& req) {
auto req_ = req.template data<uvw::file_req>();
req.data(nullptr);
if (auto self = wself.lock()) {
if (auto comp = std::exchange(self->comp_, std::nullopt)) {
self->fs_event_ = std::move(e);
comp->Complete(&*self->fs_event_);
}
}
});
self->file_->on<uvw::error_event>([wself](auto& e, auto& req) {
auto req_ = req.template data<uvw::file_req>();
req.data(nullptr);
if (auto self = wself.lock()) {
if (auto comp = std::exchange(self->comp_, std::nullopt)) {
(void) e; // TODO
comp->Throw(Exception::MakePtr("fs error"));
}
}
});
return self;
}
File::File(Env& env,
std::string_view path,
uvw::file_req::file_open_flags open_flags)
try : data::FiniteBuffer("nf7::core::uv::File::Finite"),
data::ResizableBuffer("nf7::core::uv::File::Resizable"),
data::ReadableBuffer("nf7::core::uv::File::Readable"),
data::WritableBuffer("nf7::core::uv::File::Writable"),
logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
delete_(env.Get<Context>()->Make<uvw::async_handle>()),
path_(path),
open_flags_(open_flags),
file_(env.Get<Context>()->Make<uvw::file_req>()) {
delete_->unreference();
delete_->on<uvw::async_event>([f = file_](auto&, auto& self) {
f->data(f);
f->cancel();
self.close();
});
} catch (const std::bad_alloc&) {
throw Exception {"memory shortage"};
}
Future<Void> File::Open() noexcept
try {
Future<Void>::Completer comp;
mtx_.LockEx().Then([this, comp](const auto& k) mutable {
Open(comp, k);
});
return comp.future();
} catch (const Exception&) {
return Future<Void> {std::current_exception()};
}
Future<Void> File::Open(const nf7::Mutex::SharedToken& k) noexcept {
Future<Void>::Completer comp;
Open(comp, k);
return comp.future();
}
void File::Open(Future<Void>::Completer& comp,
const nf7::Mutex::SharedToken& k) noexcept
try {
comp_.emplace();
comp_->future()
.Then([this, comp, k](auto&) mutable {
logger_->Trace(std::format("file open ({})", path_));
comp.Complete({});
})
.Catch([this, comp, k](auto& e) mutable {
logger_->Trace(std::format("failed to open file ({})", path_));
comp.Throw(std::make_exception_ptr(e));
});
file_->open(path_, open_flags_, 0666);
} catch (const Exception&) {
comp.Throw();
}
Future<uint64_t> File::FetchSize() noexcept
try {
Future<uint64_t>::Completer comp;
mtx_.LockEx().Then([this, comp](auto& k) mutable {
Open(k)
.ThenAnd([this, comp, k](auto&) mutable {
comp_.emplace();
file_->stat();
return comp_->future();
})
.Chain(comp, [k](auto& e) mutable {
return static_cast<uint64_t>(e->stat.st_size);
});
});
return comp.future();
} catch (const Exception&) {
return Future<uint64_t> {std::current_exception()};
}
Future<Void> File::Truncate(uint64_t n) noexcept
try {
Future<Void>::Completer comp;
if (n > std::numeric_limits<int64_t>::max()) {
throw Exception {"size too huge"};
}
mtx_.LockEx().Then([this, comp, n](auto& k) mutable {
Open(k)
.ThenAnd([this, comp, n, k](auto&) mutable {
comp_.emplace();
file_->truncate(static_cast<int64_t>(n));
return comp_->future();
})
.Chain(comp, [k](auto&) { return Void {}; });
});
return comp.future();
} catch (const Exception&) {
return Future<Void> {std::current_exception()};
}
Future<File::ReadResult> File::Read(
uint64_t offset, uint64_t n) noexcept
try {
Future<ReadResult>::Completer comp;
if (offset > std::numeric_limits<int64_t>::max()) {
throw Exception {"offset too huge"};
}
if (n > std::numeric_limits<unsigned int>::max()) {
throw Exception {"size too huge"};
}
mtx_.LockEx().Then([this, comp, offset, n](auto& k) mutable {
Open(k)
.ThenAnd([this, comp, offset, n, k](auto&) mutable {
comp_.emplace();
file_->read(static_cast<int64_t>(offset),
static_cast<unsigned int>(n));
return comp_->future();
})
.Chain(comp, [k](auto& e) {
return ReadResult {
std::reinterpret_pointer_cast<const uint8_t[]>(
std::shared_ptr<const char[]>(std::move(e->read.data))),
static_cast<uint64_t>(e->result),
};
});
});
return comp.future();
} catch (const Exception&) {
return Future<ReadResult> {std::current_exception()};
}
Future<uint64_t> File::Write(
uint64_t offset, const uint8_t* buf, uint64_t n) noexcept
try {
Future<uint64_t>::Completer comp;
if (offset > std::numeric_limits<int64_t>::max()) {
throw Exception {"offset too huge"};
}
if (n > std::numeric_limits<unsigned int>::max()) {
throw Exception {"size too huge"};
}
mtx_.LockEx().Then([this, comp, offset, n, buf](auto& k) mutable {
Open(k)
.ThenAnd([this, comp, offset, n, buf, k](auto&) mutable {
comp_.emplace();
file_->write(reinterpret_cast<char*>(const_cast<uint8_t*>(buf)),
static_cast<unsigned int>(n),
static_cast<int64_t>(offset));
return comp_->future();
})
.Chain(comp, [k](const auto& e) {
return static_cast<uint64_t>(e->result);
});
});
return comp.future();
} catch (const Exception&) {
return Future<uint64_t> {std::current_exception()};
}
} // namespace nf7::core::uv

92
core/uv/file.hh Normal file
View File

@ -0,0 +1,92 @@
// No copyright
#pragma once
#include <cstdint>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <uvw.hpp>
#include "iface/common/future.hh"
#include "iface/common/mutex.hh"
#include "iface/common/void.hh"
#include "iface/data/buffer.hh"
#include "iface/subsys/logger.hh"
#include "iface/env.hh"
#include "core/uv/context.hh"
namespace nf7::core::uv {
class File :
public std::enable_shared_from_this<File>,
public data::FiniteBuffer,
public data::ResizableBuffer,
public data::ReadableBuffer,
public data::WritableBuffer {
private:
class Finite;
class Resizable;
class Readable;
class Writable;
public:
using ReadResult = data::ReadableBuffer::Result;
public:
static std::shared_ptr<File> Make(
Env&, std::string_view path, uvw::file_req::file_open_flags);
protected:
File(Env&, std::string_view path, uvw::file_req::file_open_flags);
public:
virtual ~File() noexcept {
delete_->reference();
delete_->send();
}
public:
Future<Void> Open() noexcept;
Future<uint64_t> FetchSize() noexcept;
Future<Void> Truncate(uint64_t) noexcept;
Future<ReadResult> Read(uint64_t offset, uint64_t n) noexcept override;
Future<uint64_t> Write(
uint64_t offset, const uint8_t* buf, uint64_t n) noexcept override;
private:
Future<Void> Open(const nf7::Mutex::SharedToken&) noexcept;
void Open(Future<Void>::Completer&, const nf7::Mutex::SharedToken&) noexcept;
Future<uint64_t> size() const noexcept override {
return const_cast<File&>(*this).FetchSize();
}
Future<Void> Resize(uint64_t n) noexcept override { return Truncate(n); }
public:
std::shared_ptr<data::FiniteBuffer> MakeFinite();
std::shared_ptr<data::ResizableBuffer> MakeResizable();
std::shared_ptr<data::ReadableBuffer> MakeReadable();
std::shared_ptr<data::WritableBuffer> MakeWritable();
private:
const std::shared_ptr<subsys::Logger> logger_;
const std::shared_ptr<uvw::async_handle> delete_;
const std::string path_;
const uvw::file_req::file_open_flags open_flags_;
const std::shared_ptr<uvw::file_req> file_;
std::optional<Future<uvw::fs_event*>::Completer> comp_;
std::optional<uvw::fs_event> fs_event_;
mutable nf7::Mutex mtx_;
};
} // namespace nf7::core::uv

186
core/uv/file_test.cc Normal file
View File

@ -0,0 +1,186 @@
// No copyright
#include "core/uv/file.hh"
#include <gtest/gtest.h>
#include <cstring>
#include <filesystem>
#include <fstream>
#include <utility>
#include "core/uv/context_test.hh"
using namespace std::literals;
class UV_File :
public nf7::core::uv::test::ContextFixture,
public ::testing::WithParamInterface<bool> {
protected:
static constexpr const char* kTempFile =
"./.this_is_temporary_file_for_uv_file_unittest";
protected:
void TearDown() override {
ContextFixture::TearDown();
std::filesystem::remove(kTempFile);
}
bool isCompleteTest() const noexcept { return GetParam(); }
void PrepareFile(const char* text) {
std::ofstream f {kTempFile};
f << text;
}
};
TEST_P(UV_File, Open) {
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDWR
| uvw::file_req::file_open_flags::CREAT);
auto result = sut->Open();
if (isCompleteTest()) {
ctx_->Run();
EXPECT_TRUE(result.done());
EXPECT_TRUE(std::filesystem::exists(kTempFile));
} else {
result.Then([](auto&) { FAIL(); });
}
}
TEST_P(UV_File, FetchSize) {
PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDONLY);
auto result = sut->FetchSize();
if (isCompleteTest()) {
ctx_->Run();
EXPECT_TRUE(result.done());
EXPECT_EQ(result.value(), 10);
} else {
result.Then([](auto&) { FAIL(); });
}
}
TEST_P(UV_File, FetchSizeFail) {
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDONLY);
auto result = sut->FetchSize();
if (isCompleteTest()) {
ctx_->Run();
}
result.Then([](auto&) { FAIL(); });
}
TEST_P(UV_File, Truncate) {
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDWR
| uvw::file_req::file_open_flags::CREAT);
auto result = sut->Truncate(256);
if (isCompleteTest()) {
ctx_->Run();
EXPECT_TRUE(result.done());
EXPECT_EQ(std::filesystem::file_size(kTempFile), 256);
} else {
result.Then([](auto&) { FAIL(); });
}
}
TEST_P(UV_File, TruncateFail) {
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDONLY);
auto result = sut->Truncate(256);
if (isCompleteTest()) {
ctx_->Run();
}
result.Then([](auto&) { FAIL(); });
}
TEST_P(UV_File, Read) {
PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDONLY);
auto result = sut->Read(1, 3);
if (isCompleteTest()) {
ctx_->Run();
ASSERT_TRUE(result.done());
const auto [ptr, size] = result.value();
const auto cptr = reinterpret_cast<const char*>(ptr.get());
const std::string_view text {cptr, cptr+size};
EXPECT_EQ(text, "ell");
} else {
result.Then([](auto&) { FAIL(); });
}
}
TEST_P(UV_File, ReadFail) {
PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::WRONLY);
auto result = sut->Read(1, 3);
if (isCompleteTest()) {
ctx_->Run();
}
result.Then([](auto&) { FAIL(); });
}
TEST_P(UV_File, Write) {
PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::WRONLY);
auto result = sut->Write(5, reinterpret_cast<const uint8_t*>("universe"), 8);
if (isCompleteTest()) {
ctx_->Run();
ASSERT_TRUE(result.done());
EXPECT_EQ(result.value(), 8);
std::ifstream fs {kTempFile};
const std::string text {
std::istreambuf_iterator<char> {fs},
std::istreambuf_iterator<char> {},
};
EXPECT_EQ(text, "hellouniverse");
} else {
result.Then([](auto&) { FAIL(); });
}
}
TEST_P(UV_File, WriteFail) {
PrepareFile("helloworld");
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDONLY);
auto result = sut->Write(5, reinterpret_cast<const uint8_t*>("universe"), 8);
if (isCompleteTest()) {
ctx_->Run();
}
result.Then([](auto&) { FAIL(); });
}
INSTANTIATE_TEST_SUITE_P(
CompleteOrCancel, UV_File,
testing::Values(true, false));