内容纲要
  1. uv_tcp_init 初始化 TCP 监视器.
  2. uv_tcp_bind 绑定.
  3. 在指定的监视器上调用 uv_listen 来设置回调函数, 当有新的客户端连接到来时, libuv 就会调用设置的回调函数.
  4. uv_accept 接受连接.
  5. 使用 stream operations 与客户端进行通信.

httpserver.c

#include <stdio.h>
#include <stdlib.h>

#include "libuv/include/uv.h"
#include "libuv/test/task.h"
#include "http-parser/http_parser.h"

const static char* HOST     = "0.0.0.0"; /* localhost */
const static int   PORT     = 8888;
const static int   BACKLOG  = 128;

#define RESPONSE                  \
    "HTTP/1.1 200 OK\r\n"           \
    "Content-Type: text/plain\r\n"  \
    "Content-Length: 14\r\n"        \
    "\r\n"                          \
    "Hello, World!\n"

/* types */
typedef struct {
    uv_tcp_t handle;
    http_parser parser;
} client_t;

/* variables */
static uv_tcp_t server;
static http_parser_settings parser_settings;

/* forward declaration */
static void close_cb(uv_handle_t *);
static void shutdown_cb(uv_shutdown_t *, int);
static void alloc_cb(uv_handle_t*, size_t, uv_buf_t*);
static void connection_cb(uv_stream_t *, int);
static void read_cb(uv_stream_t*, ssize_t, const uv_buf_t*);
static void write_cb(uv_write_t*, int);

static int headers_complete_cb(http_parser*);

/* tcp callbacks */
void close_cb(uv_handle_t *handle)
{
    client_t *client = (client_t *) handle->data;
    free(client);
}

void shutdown_cb(uv_shutdown_t *shutdown_req, int status)
{
    uv_close((uv_handle_t *) shutdown_req->handle, close_cb);
    free(shutdown_req);
}

void alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
    buf->base = malloc(suggested_size);
    buf->len = suggested_size;
    ASSERT(buf->base != NULL);
}

void connection_cb(uv_stream_t *server, int status)
{
    ASSERT(status == 0);
    client_t *client = malloc(sizeof(client_t));
    int r = uv_tcp_init(server->loop, &client->handle);
    ASSERT(r == 0);
    client->handle.data = client;
    r = uv_accept(server, (uv_stream_t *) &client->handle);

    if (r) {
        uv_shutdown_t *shutdown_req = malloc(sizeof(uv_shutdown_t));
        uv_shutdown(shutdown_req, (uv_stream_t *) &client->handle, shutdown_cb);
        ASSERT(r == 0);
    }

    http_parser_init(&client->parser, HTTP_REQUEST);
    client->parser.data = client;
    r = uv_read_start((uv_stream_t *) &client->handle, alloc_cb, read_cb);
}

void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
    int r = 0;
    client_t *client = (client_t *) handle->data;

    if (nread >= 0) {
        size_t parsed = http_parser_execute(&client->parser, &parser_settings, buf->base, nread);

        if (parsed < nread) {
            LOG("parse error\n");
            uv_close((uv_handle_t *) handle, close_cb);
        }
    } else {
        if (nread == UV_EOF) {
            // do nothing
        } else {
            LOGF("read: %s\n", uv_strerror(nread));
        }

        uv_shutdown_t *shutdown_req = malloc(sizeof(uv_shutdown_t));
        r = uv_shutdown(shutdown_req, handle, shutdown_cb);
        ASSERT(r == 0);
    }

    free(buf->base);
}

void write_cb(uv_write_t* write_req, int status)
{
    uv_close((uv_handle_t *) write_req->handle, close_cb);
    free(write_req);
}

/* http callback */
int headers_complete_cb(http_parser* parser)
{
    client_t *client = (client_t *) parser->data;
    uv_write_t *write_req = malloc(sizeof(uv_write_t));
    uv_buf_t buf = uv_buf_init(RESPONSE, sizeof(RESPONSE));
    int r = uv_write(write_req, (uv_stream_t *) &client->handle, &buf, 1, write_cb);
    ASSERT(r == 0);
    return 1;
}

/* server */
void server_init(uv_loop_t *loop)
{
    int r = uv_tcp_init(loop, &server);
    ASSERT(r == 0);
    struct sockaddr_in addr;
    r = uv_ip4_addr(HOST, PORT, &addr);
    ASSERT(r == 0);
    r = uv_tcp_bind(&server, (struct sockaddr *) &addr, 0);
    ASSERT(r == 0);
    r = uv_listen((uv_stream_t *) &server, BACKLOG, connection_cb);
    ASSERT(r == 0);
}

int main()
{
    uv_loop_t *loop = uv_default_loop();
    parser_settings.on_headers_complete = headers_complete_cb;
    server_init(loop);
    int r = uv_run(loop, UV_RUN_DEFAULT);
    ASSERT(r == 0);
    /* If no more events here, cleanup loop */
    MAKE_VALGRIND_HAPPY();
    return 0;
}

uv_packet.cpp

#include "uv_packet.h"
#include "uv_service.h"
#include "uv_session.h"

namespace network
{
uv_packet::uv_packet()
{
    m_head = uv_buf_init((char*)malloc(PACKET_HEAD_LENGTH), PACKET_HEAD_LENGTH);
    m_head = uv_buf_init((char*)malloc(PACKET_BUFFER_SIZE), PACKET_BUFFER_SIZE);
}

uv_packet::~uv_packet()
{
    SAFE_FREE(m_head.base);
    SAFE_FREE(m_body.base);
    m_head.len = 0;
    m_body.len = 0;
    m_length = 0;
}

void uv_packet::clear()
{
    if (m_head.base != nullptr) {
        memset(m_head.base, 0, m_head.len);
    }

    if (m_body.base != nullptr) {
        memset(m_body.base, 0, m_body.len);
    }
}

int uv_packet::receive_head(uv_session* session)
{
    session->packet().clear();
    int r = uv_read_start((uv_stream_t*)session->tcp(), on_alloc_head, on_receive_head);
    return r;
}
int uv_packet::receive_body(uv_session* session)
{
    int r = uv_read_start((uv_stream_t*)session->tcp(), on_alloc_body, on_receive_body);
    return r;
}

void uv_packet::on_alloc_head(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
    ASSERT(handle->data != nullptr);
    uv_session* session = (uv_session*)handle->data;
    *buf = session->packet().head();
}

void uv_packet::on_alloc_body(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
    ASSERT(handle->data != nullptr);
    uv_session* session = (uv_session*)handle->data;
    uv_buf_t&  bodybuf = session->packet().body();
    SAFE_FREE(bodybuf.base);
    bodybuf.len = 0;

    if (session->packet().length() == uv_packet::PACKET_HEAD_LENGTH) {
        size_t bodylength = session->packet().bodylength();

        if (bodylength > 0) {
            bodybuf.base = (char*)malloc(bodylength);
            bodybuf.len = bodylength;
            *buf = bodybuf;
        }
    }
}

void uv_packet::on_receive_head(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
    ASSERT(handle->data != nullptr);
    uv_session* session     = (uv_session*)handle->data;
    uv_service* service     = session->service();
    uv_packet& packet       = session->packet();

    if (nread == packet.head().len) {
        packet.length(nread);
        int bodylength = packet.bodylength();

        if (bodylength > 0) {
            receive_body(session);
        } else {
            std::string data;
            data.append(packet.head().base, packet.head().len);
            service->on_tcpreceive(session, data.c_str(), data.size());
            packet.clear();
            receive_head(session);
        }
    } else {
        packet.clear();

        if (nread < 0) {
            int id = session->id();
            service->closesession(id);

            if (nread == UV_EOF) {
                fprintf(stdout, "client %d disconnected, close it.\n", id);
            } else if (nread == UV_ECONNRESET) {
                fprintf(stdout, "client %d disconnected unusually, close it.\n", id);
            } else {
                ASSERT(nread >= 0);
            }
        }
    }
}
void uv_packet::on_receive_body(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
    if (handle->data == nullptr) {
        return;
    }

    uv_session* session     = (uv_session*)handle->data;
    uv_service* service     = session->service();
    uv_packet& packet       = session->packet();

    if (nread > 0 && nread == session->packet().bodylength()) {
        packet.length(packet.length() + packet.bodylength());
        std::string data;
        data.append(packet.head().base, packet.head().len);
        data.append(packet.body().base, packet.bodylength());
        service->on_tcpreceive(session, data.c_str(), data.size());
        packet.clear();
        receive_head(session);
    } else {
        packet.clear();

        if (nread < 0) {
            int id = session->id();
            service->closesession(id);

            if (nread == UV_EOF) {
                fprintf(stdout, "client %d disconnected, close it.\n", id);
            } else if (nread == UV_ECONNRESET) {
                fprintf(stdout, "client %d disconnected unusually, close it.\n", id);
            } else {
                ASSERT(nread >= 0);
            }
        }
    }
}
}

Miner.cpp

/* XMRig
 * Copyright 2010      Jeff Garzik <jgarzik@pobox.com>
 * Copyright 2012-2014 pooler      <pooler@litecoinpool.org>
 * Copyright 2014      Lucas Jones <https://github.com/lucasjones>
 * Copyright 2014-2016 Wolf9466    <https://github.com/OhGodAPet>
 * Copyright 2016      Jay D Dee   <jayddee246@gmail.com>
 * Copyright 2016-2017 XMRig       <support@xmrig.com>
 *
 *
 *   This program is free software: you can redistribute it and/or modify
 *   it under the terms of the GNU General Public License as published by
 *   the Free Software Foundation, either version 3 of the License, or
 *   (at your option) any later version.
 *
 *   This program is distributed in the hope that it will be useful,
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 *   GNU General Public License for more details.
 *
 *   You should have received a copy of the GNU General Public License
 *   along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <inttypes.h>
#include <stdio.h>
#include <string.h>

#include "log/Log.h"
#include "net/Job.h"
#include "proxy/Counters.h"
#include "proxy/Error.h"
#include "proxy/Events.h"
#include "proxy/events/CloseEvent.h"
#include "proxy/events/LoginEvent.h"
#include "proxy/events/SubmitEvent.h"
#include "proxy/JobResult.h"
#include "proxy/LoginRequest.h"
#include "proxy/Miner.h"
#include "proxy/Uuid.h"
#include "rapidjson/document.h"
#include "rapidjson/error/en.h"

static int64_t nextId = 0;

Miner::Miner() :
    m_id(++nextId),
    m_loginId(0),
    m_recvBufPos(0),
    m_mapperId(-1),
    m_state(WaitLoginState),
    m_customDiff(0),
    m_diff(0),
    m_expire(uv_now(uv_default_loop()) + kLoginTimeout),
    m_rx(0),
    m_timestamp(uv_now(uv_default_loop())),
    m_tx(0),
    m_fixedByte(0)
{
    memset(m_ip, 0, sizeof(m_ip));
    Uuid::create(m_rpcId, sizeof(m_rpcId));

    m_socket.data = this;
    uv_tcp_init(uv_default_loop(), &m_socket);

    m_recvBuf.base = m_buf;
    m_recvBuf.len  = sizeof(m_buf);

    Counters::connections++;
}

Miner::~Miner()
{
    m_socket.data = nullptr;

    Counters::connections--;
}

bool Miner::accept(uv_stream_t *server)
{
    const int rt = uv_accept(server, reinterpret_cast<uv_stream_t*>(&m_socket));
    if (rt < 0) {
        LOG_ERR("[miner] accept error: \"%s\"", uv_strerror(rt));
        return false;
    }

    sockaddr_storage addr = { 0 };
    int size = sizeof(addr);

    uv_tcp_getpeername(&m_socket, reinterpret_cast<sockaddr*>(&addr), &size);
    uv_ip4_name(reinterpret_cast<sockaddr_in*>(&addr), m_ip, 16);

    uv_read_start(reinterpret_cast<uv_stream_t*>(&m_socket), Miner::onAllocBuffer, Miner::onRead);

    return true;
}

void Miner::replyWithError(int64_t id, const char *message)
{
    send(snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRId64 ",\"jsonrpc\":\"2.0\",\"error\":{\"code\":-1,\"message\":\"%s\"}}\n", id, message));
}

void Miner::setJob(Job &job)
{
    snprintf(m_sendBuf, 4, "%02hhx", m_fixedByte);

    memcpy(job.rawBlob() + 84, m_sendBuf, 2);

    m_diff = job.diff();
    bool customDiff = false;

    char target[9];
    if (m_customDiff && m_customDiff < m_diff) {
        const uint64_t t = 0xFFFFFFFFFFFFFFFFULL / m_customDiff;
        Job::toHex(reinterpret_cast<const unsigned char *>(&t) + 4, 4, target);
        target[8] = '\0';
        customDiff = true;
    }

    int size = 0;
    if (m_state == WaitReadyState) {
        setState(ReadyState);
        size = snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRId64 ",\"jsonrpc\":\"2.0\",\"result\":{\"id\":\"%s\",\"job\":{\"blob\":\"%s\",\"job_id\":\"%s%02hhx0\",\"target\":\"%s\"},\"status\":\"OK\"}}\n",
                        m_loginId, m_rpcId, job.rawBlob(), job.id().data(), m_fixedByte, customDiff ? target : job.rawTarget());
    }
    else {
        size = snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\":{\"blob\":\"%s\",\"job_id\":\"%s%02hhx0\",\"target\":\"%s\"}}\n",
                        job.rawBlob(), job.id().data(), m_fixedByte, customDiff ? target : job.rawTarget());
    }

    send(size);
}

void Miner::success(int64_t id, const char *status)
{
    send(snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRId64 ",\"jsonrpc\":\"2.0\",\"error\":null,\"result\":{\"status\":\"%s\"}}\n", id, status));
}

bool Miner::parseRequest(int64_t id, const char *method, const rapidjson::Value &params)
{
    if (!method || !params.IsObject()) {
        return false;
    }

    if (m_state == WaitLoginState) {
        if (strcmp(method, "login") == 0) {
            setState(WaitReadyState);
            m_loginId = id;

            LoginEvent::create(this, id, params["login"].GetString(), params["pass"].GetString(), params["agent"].GetString())->start();
            return true;
        }

        return false;
    }

    if (m_state == WaitReadyState) {
        return false;
    }

    if (strcmp(method, "submit") == 0) {
        heartbeat();

        const char *rpcId = params["id"].GetString();
        if (!rpcId || strncmp(m_rpcId, rpcId, sizeof(m_rpcId)) != 0) {
            replyWithError(id, Error::toString(Error::Unauthenticated));
            return true;
        }

        SubmitEvent *event = SubmitEvent::create(this, id, params["job_id"].GetString(), params["nonce"].GetString(), params["result"].GetString());

        if (!event->request.isValid() || event->request.actualDiff() < diff()) {
            event->reject(Error::LowDifficulty);
        }
        else if (!event->request.isCompatible(m_fixedByte)) {
            event->reject(Error::InvalidNonce);
        }

        if (m_customDiff && event->request.actualDiff() < m_diff) {
            success(id, "OK");
            return true;
        }

        if (!event->start()) {
            replyWithError(id, event->message());
        }

        return true;
    }

    if (strcmp(method, "keepalived") == 0) {
        heartbeat();
        success(id, "KEEPALIVED");
        return true;
    }

    replyWithError(id, Error::toString(Error::InvalidMethod));
    return true;
}

void Miner::heartbeat()
{
    m_expire = uv_now(uv_default_loop()) + kSocketTimeout;
}

void Miner::parse(char *line, size_t len)
{
    if (m_state == ClosingState) {
        return;
    }

    line[len - 1] = '\0';

    LOG_DEBUG("[%s] received (%d bytes): \"%s\"", m_ip, len, line);

    if (len < 32 || line[0] != '{') {
        return shutdown(true);
    }

    rapidjson::Document doc;
    if (doc.ParseInsitu(line).HasParseError()) {
        LOG_ERR("[%s] JSON decode failed: \"%s\"", m_ip, rapidjson::GetParseError_En(doc.GetParseError()));

        return shutdown(true);
    }

    if (!doc.IsObject()) {
        return shutdown(true);
    }

    const rapidjson::Value &id = doc["id"];
    if (id.IsInt64() && parseRequest(id.GetInt64(), doc["method"].GetString(), doc["params"])) {
        return;
    }

    shutdown(true);
}

void Miner::send(int size)
{
    LOG_DEBUG("[%s] send (%d bytes): \"%s\"", m_ip, size, m_sendBuf);

    if (size <= 0 || m_state != ReadyState || uv_is_writable(reinterpret_cast<uv_stream_t*>(&m_socket)) == 0) {
        return;
    }

    uv_buf_t buf = uv_buf_init(m_sendBuf, (unsigned int) size);
    const int rc = uv_try_write(reinterpret_cast<uv_stream_t*>(&m_socket), &buf, 1);

    if (rc < 0) {
        return shutdown(true);
    }
}

void Miner::setState(State state)
{
    if (m_state == state) {
        return;
    }

    if (state == ReadyState) {
        heartbeat();
        Counters::add();
    }

    if (state == ClosingState && m_state == ReadyState) {
        Counters::remove();
    }

    m_state = state;
}

void Miner::shutdown(bool had_error)
{
    if (m_state == ClosingState) {
        return;
    }

    setState(ClosingState);
    uv_read_stop(reinterpret_cast<uv_stream_t*>(&m_socket));

    uv_shutdown(new uv_shutdown_t, reinterpret_cast<uv_stream_t*>(&m_socket), [](uv_shutdown_t* req, int status) {

        if (uv_is_closing(reinterpret_cast<uv_handle_t*>(req->handle)) == 0) {
            uv_close(reinterpret_cast<uv_handle_t*>(req->handle), [](uv_handle_t *handle) {
                CloseEvent::start(getMiner(handle->data));
                delete static_cast<Miner*>(handle->data);
            });
        }

        delete req;
    });
}

void Miner::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
    auto miner = getMiner(handle->data);

    buf->base = &miner->m_recvBuf.base[miner->m_recvBufPos];
    buf->len  = miner->m_recvBuf.len - miner->m_recvBufPos;
}

void Miner::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
    auto miner = getMiner(stream->data);
    if (nread < 0 || (size_t) nread > (sizeof(m_buf) - 8 - miner->m_recvBufPos)) {
        return miner->shutdown(nread != UV_EOF);;
    }

    miner->m_rx += nread;
    miner->m_recvBufPos += nread;

    char* end;
    char* start = buf->base;
    size_t remaining = miner->m_recvBufPos;

    while ((end = static_cast<char*>(memchr(start, '\n', remaining))) != nullptr) {
        end++;
        size_t len = end - start;
        miner->parse(start, len);

        remaining -= len;
        start = end;
    }

    if (remaining == 0) {
        miner->m_recvBufPos = 0;
        return;
    }

    if (start == buf->base) {
        return;
    }

    memcpy(buf->base, start, remaining);
    miner->m_recvBufPos = remaining;
}

void Miner::onTimeout(uv_timer_t *handle)
{
    auto miner = getMiner(handle->data);
    miner->m_recvBuf.base[sizeof(m_buf) - 1] = '\0';

    miner->shutdown(true);
}

boost 的拆包方式

boost::asio::read_until(sock, response, "\r\n");

发表评论

电子邮件地址不会被公开。 必填项已用*标注