WIP: add uv::File

This commit is contained in:
falsycat 2023-08-17 12:18:33 +09:00
parent 2cbfc8e64a
commit 617a8dcd5e
4 changed files with 255 additions and 0 deletions

View File

@ -22,6 +22,8 @@ target_sources(nf7_core
uv/clock.hh uv/clock.hh
uv/concurrency.hh uv/concurrency.hh
uv/context.hh uv/context.hh
uv/file.cc
uv/file.hh
uv/parallelism.hh uv/parallelism.hh
clock.hh clock.hh
logger.hh logger.hh
@ -38,6 +40,7 @@ target_sources(nf7_core_test
luajit/thread_test.hh luajit/thread_test.hh
uv/context_test.hh uv/context_test.hh
uv/concurrency_test.cc uv/concurrency_test.cc
uv/file_test.cc
uv/parallelism_test.cc uv/parallelism_test.cc
clock_test.cc clock_test.cc
) )

128
core/uv/file.cc Normal file
View File

@ -0,0 +1,128 @@
// No copyright
#include "core/uv/file.hh"
#include <cassert>
#include <utility>
#include "iface/common/exception.hh"
#include "core/logger.hh"
namespace nf7::core::uv {
class File::Finite final : public data::FiniteBuffer {
public:
explicit Finite(std::shared_ptr<File>&& f) noexcept
: data::FiniteBuffer("nf7::core::uv::File::Finite"),
base_(std::move(f)) { }
Future<uint64_t> size() const noexcept override {
Future<uint64_t>::Completer comp;
base_->QueueTask([base = base_.get(), comp]() mutable {
if (base->dead_) {
comp.Throw(std::make_exception_ptr(Exception {"open failure"}));
return false;
}
base->on_success_ = [comp](auto& e) mutable {
comp.Complete(static_cast<uint64_t>(e.stat.st_size));
};
base->on_error_ = [comp](auto&) mutable {
comp.Throw(std::make_exception_ptr(Exception {"stat error"}));
};
base->file_->data(base->file_);
base->file_->stat();
return true;
});
return comp.future();
}
private:
const std::shared_ptr<File> base_;
};
std::shared_ptr<data::FiniteBuffer> File::MakeFinite()
try {
return std::make_shared<Finite>(shared_from_this());
} catch (const std::bad_alloc&) {
throw Exception {"memory shortage"};
}
std::shared_ptr<File> File::Make(
Env& env, std::string_view path, uvw::file_req::file_open_flags flags) {
class A : public File {
public:
explicit A(Env& env) : File(env) { }
};
auto self = std::make_shared<A>(env);
std::weak_ptr<File> wself {self};
self->file_->on<uvw::fs_event>([wself](auto& e, auto& req) {
if (auto self = wself.lock()) {
if (self->on_success_) { self->on_success_(e); }
self->on_success_ = {};
self->on_error_ = {};
self->working_ = false;
self->ConsumeTask();
}
req.data(nullptr);
});
self->file_->on<uvw::error_event>([wself](auto& e, auto& req) {
if (auto self = wself.lock()) {
if (self->on_error_) { self->on_error_(e); }
self->on_success_ = {};
self->on_error_ = {};
self->working_ = false;
self->ConsumeTask();
}
req.data(nullptr);
});
self->QueueTask([self = self.get(), path = std::string {path}, flags]() {
self->on_error_ = [self](auto&) {
self->logger_->Error("failed to open file");
self->dead_ = true;
};
self->file_->open(path, flags, 0666);
return true;
});
return self;
}
File::File(Env& env)
try : logger_(env.GetOr<subsys::Logger>(NullLogger::instance())),
delete_(env.Get<Context>()->Make<uvw::async_handle>()),
file_(env.Get<Context>()->Make<uvw::file_req>()) {
delete_->unreference();
delete_->on<uvw::async_event>([f = file_](auto&, auto& self) {
f->data(f);
f->cancel();
self.close();
});
} catch (const std::bad_alloc&) {
throw Exception {"memory shortage"};
}
void File::QueueTask(Task&& task) noexcept
try {
tasks_.push_back(std::move(task));
ConsumeTask();
} catch (const std::bad_alloc&) {
logger_->Error("failed to queue a file task");
}
void File::ConsumeTask() noexcept {
while (!working_ && !tasks_.empty()) {
auto task = std::move(tasks_.front());
tasks_.pop_front();
try {
working_ = task();
} catch (const Exception&) {
logger_->Error("a file task threw an exception");
}
}
}
} // namespace nf7::core:uv

68
core/uv/file.hh Normal file
View File

@ -0,0 +1,68 @@
// No copyright
#pragma once
#include <cstdint>
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <string_view>
#include <uvw.hpp>
#include "iface/common/future.hh"
#include "iface/data/buffer.hh"
#include "iface/subsys/logger.hh"
#include "iface/env.hh"
#include "core/uv/context.hh"
namespace nf7::core::uv {
class File : public std::enable_shared_from_this<File> {
private:
class Finite;
class Resizable;
class Readable;
class Writable;
public:
static std::shared_ptr<File> Make(
Env&, std::string_view path, uvw::file_req::file_open_flags);
protected:
explicit File(Env&);
public:
virtual ~File() noexcept {
delete_->reference();
delete_->send();
}
public:
std::shared_ptr<data::FiniteBuffer> MakeFinite();
std::shared_ptr<data::ResizableBuffer> MakeResizable();
std::shared_ptr<data::ReadableBuffer> MakeReadable();
std::shared_ptr<data::WritableBuffer> MakeWritable();
private:
// it's guaranteed that File is alive while a task is executed
using Task = std::function<bool()>;
void QueueTask(Task&&) noexcept;
void ConsumeTask() noexcept;
private:
const std::shared_ptr<subsys::Logger> logger_;
const std::shared_ptr<uvw::async_handle> delete_;
const std::shared_ptr<uvw::file_req> file_;
std::function<void(const uvw::fs_event&)> on_success_;
std::function<void(const uvw::error_event&)> on_error_;
std::deque<Task> tasks_;
bool working_ = false;
bool dead_ = false;
};
} // namespace nf7::core::uv

56
core/uv/file_test.cc Normal file
View File

@ -0,0 +1,56 @@
// No copyright
#include "core/uv/file.hh"
#include <gtest/gtest.h>
#include <filesystem>
#include <fstream>
#include "core/uv/context_test.hh"
class UV_File : public nf7::core::uv::test::ContextFixture {
protected:
static constexpr const char* kTempFile =
"./.this_is_temporary_file_for_uv_file_unittest";
protected:
void TearDown() override {
ContextFixture::TearDown();
std::filesystem::remove(kTempFile);
}
};
TEST_F(UV_File, Open) {
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDWR
| uvw::file_req::file_open_flags::CREAT);
ctx_->Run();
EXPECT_TRUE(std::filesystem::exists(kTempFile));
}
TEST_F(UV_File, CancelOpen) {
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDWR
| uvw::file_req::file_open_flags::CREAT);
}
TEST_F(UV_File, FiniteSize) {
{
std::ofstream f {kTempFile};
f << "helloworld";
f.flush();
}
auto sut = nf7::core::uv::File::Make(
*env_, kTempFile,
uvw::file_req::file_open_flags::RDONLY);
sut->MakeFinite()->size()
.Then([](auto& v) { EXPECT_EQ(v, 10); })
.Catch([](const auto& e) { FAIL() << e; });
ctx_->Run();
}