llarp/ev/libuv.hpp
Source code
#pragma once
#ifdef WITH_LIBUV
#include "ev.hpp"
#include "udp_handle.hpp"
#include <llarp/util/thread/queue.hpp>
#include <llarp/util/meta/memfn.hpp>
#include <uvw/loop.h>
#include <uvw/async.h>
#include <uvw/poll.h>
#include <functional>
#include <map>
#include <vector>
#include <unordered_map>
namespace llarp::uv
{
class UVWakeup;
class UVRepeater;
class UDPHandle;
class TCPConnectionPoolImpl;
class TCPAcceptorImpl;
class Loop : public llarp::EventLoop
{
public:
friend UDPHandle;
friend TCPConnectionPoolImpl;
friend TCPAcceptorImpl;
using Callback = std::function<void()>;
Loop(size_t queue_size, size_t worker_num_threads);
~Loop() override;
virtual void
run() override;
bool
running() const override;
llarp_time_t
time_now() const override
{
return m_Impl->now();
}
void
call_later(llarp_time_t delay_ms, std::function<void(void)> callback) override;
void
tick_event_loop();
void
stop() override;
bool
add_ticker(std::function<void(void)> ticker) override;
bool
add_network_interface(
std::shared_ptr<llarp::vpn::NetworkInterface> netif,
std::function<void(llarp::net::IPPacket)> handler) override;
void
call_soon(std::function<void(void)> f) override;
std::shared_ptr<llarp::EventLoopWakeup>
make_waker(std::function<void()> callback) override;
std::shared_ptr<EventLoopRepeater>
make_repeater() override;
virtual std::shared_ptr<llarp::UDPHandle>
make_udp(UDPReceiveFunc on_recv) override;
std::shared_ptr<EventLoopPoller>
add_poller(int fd, std::function<void()> callback) override;
void
FlushLogic();
std::shared_ptr<uvw::Loop>
MaybeGetUVWLoop() override;
bool
inEventLoop() const override;
void
queue_work(std::unique_ptr<EventLoopWork> work) override;
void
queue_slow_work(std::unique_ptr<EventLoopWork> work) override;
size_t
num_worker_threads() const override;
void
add_closer(std::function<void(void)> f);
TCPConnectionPool&
connection_pool() override;
protected:
std::shared_ptr<uvw::Loop> m_Impl;
std::optional<std::thread::id> m_EventLoopThreadID;
void
io_cycle_complete();
private:
std::shared_ptr<uvw::AsyncHandle> m_WakeUp;
std::atomic<bool> m_Run;
using AtomicQueue_t = llarp::thread::Queue<std::function<void(void)>>;
AtomicQueue_t m_LogicCalls;
AtomicQueue_t m_DiskCalls;
AtomicQueue_t m_WorkCalls;
std::unique_ptr<std::thread> m_DiskThread;
std::vector<std::thread> m_WorkThreads;
std::vector<std::function<void()>> m_closers, m_tickers;
std::shared_ptr<TCPConnectionPool> m_ConnectionPool;
#ifdef LOKINET_DEBUG
uint64_t last_time;
uint64_t loop_run_count;
#endif
std::atomic<uint32_t> m_nextID;
std::map<uint32_t, Callback> m_pendingCalls;
std::unordered_map<int, std::shared_ptr<uvw::PollHandle>> m_Polls;
void
wakeup() override;
};
} // namespace llarp::uv
#endif
Updated on 2026-04-01 at 23:35:40 +0000