Merge branch 'master' into solobtc

This commit is contained in:
Con Kolivas 2017-02-02 12:06:30 +11:00
commit 6f32c43f8b
3 changed files with 151 additions and 65 deletions

View File

@ -137,7 +137,7 @@ struct connector_data {
int clients_generated;
int dead_generated;
int64_t client_id;
int64_t client_ids;
/* client message process queue */
ckmsgq_t *cmpq;
@ -246,6 +246,20 @@ static void recycle_client(cdata_t *cdata, client_instance_t *client)
ck_wunlock(&cdata->lock);
}
/* Allows the stratifier to get a unique local virtualid for subclients */
int64_t connector_newclientid(ckpool_t *ckp)
{
int64_t ret;
cdata_t *cdata = ckp->cdata;
ck_wlock(&cdata->lock);
ret = cdata->client_ids++;
ck_wunlock(&cdata->lock);
return ret;
}
/* Accepts incoming connections on the server socket and generates client
* instances */
static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
@ -313,7 +327,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
cdata->nfds, fd, no_clients, client->address_name, port);
ck_wlock(&cdata->lock);
client->id = cdata->client_id++;
client->id = cdata->client_ids++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
ck_wunlock(&cdata->lock);
@ -1657,7 +1671,7 @@ void *connector(void *arg)
cdata->nfds = 0;
/* Set the client id to the highest serverurl count to distinguish
* them from the server fds in epoll. */
cdata->client_id = ckp->serverurls;
cdata->client_ids = ckp->serverurls;
mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata);

View File

@ -10,6 +10,7 @@
#ifndef CONNECTOR_H
#define CONNECTOR_H
int64_t connector_newclientid(ckpool_t *ckp);
void connector_upstream_msg(ckpool_t *ckp, char *msg);
void connector_add_message(ckpool_t *ckp, json_t *val);
char *connector_stats(void *data, const int runtime);

View File

@ -85,7 +85,13 @@ typedef struct pool_stats pool_stats_t;
struct workbase {
/* Hash table data */
UT_hash_handle hh;
/* The next two fields need to be consecutive as both of them are
* used as the key for their hashtable entry in remote_workbases */
int64_t id;
/* The client id this workinfo came from if remote */
int64_t client_id;
char idstring[20];
/* How many readers we currently have of this workbase, set
@ -94,8 +100,6 @@ struct workbase {
/* The id a remote workinfo is mapped to locally */
int64_t mapped_id;
/* The client id this remote workinfo came from */
int64_t client_id;
ts_t gentime;
tv_t retired;
@ -267,6 +271,9 @@ struct stratum_instance {
UT_hash_handle hh;
int64_t id;
/* Virtualid used as unique local id for passthrough clients */
int64_t virtualid;
stratum_instance_t *next;
stratum_instance_t *prev;
@ -1188,8 +1195,8 @@ static void generate_userwbs(sdata_t *sdata, workbase_t *wb)
* pool mode but unique to each subproxy in proxy mode */
static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_block)
{
workbase_t *tmp, *tmpa, *aged = NULL;
sdata_t *ckp_sdata = ckp->sdata;
workbase_t *tmp, *tmpa;
int len, ret;
ts_realtime(&wb->gentime);
@ -1227,6 +1234,11 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl
if (ckp->logshares)
sprintf(wb->logdir, "%s%08x/%s", ckp->logdir, wb->height, wb->idstring);
HASH_ADD_I64(sdata->workbases, id, wb);
if (sdata->current_workbase)
tv_time(&sdata->current_workbase->retired);
sdata->current_workbase = wb;
/* Is this long enough to ensure we don't dereference a workbase
* immediately? Should be unless clock changes 10 minutes so we use
* ts_realtime */
@ -1235,19 +1247,21 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl
break;
if (wb == tmp)
continue;
if (wb->readcount)
if (tmp->readcount)
continue;
/* Age old workbases older than 10 minutes old */
if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) {
HASH_DEL(sdata->workbases, tmp);
aged = tmp;
break;
ck_wunlock(&sdata->workbase_lock);
/* Drop lock to avoid recursive locks */
send_ageworkinfo(ckp, tmp->id);
age_share_hashtable(sdata, tmp->id);
clear_workbase(ckp, tmp);
ck_wlock(&sdata->workbase_lock);
}
}
HASH_ADD_I64(sdata->workbases, id, wb);
if (sdata->current_workbase)
tv_time(&sdata->current_workbase->retired);
sdata->current_workbase = wb;
ck_wunlock(&sdata->workbase_lock);
/* This wb can't be pulled out from under us so no workbase lock is
@ -1260,14 +1274,6 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl
if (!ckp->passthrough)
send_workinfo(ckp, sdata, wb);
/* Send the aged work message to ckdb once we have dropped the workbase lock
* to prevent taking recursive locks */
if (aged) {
send_ageworkinfo(ckp, aged->id);
age_share_hashtable(sdata, aged->id);
clear_workbase(ckp, aged);
}
}
static void broadcast_ping(sdata_t *sdata);
@ -1865,6 +1871,14 @@ out:
return ret;
}
/* Remote workbases are keyed by the combined values of wb->id and
* wb->client_id to prevent collisions in the unlikely event two remote
* servers are generating the same workbase ids. */
static void __add_to_remote_workbases(sdata_t *sdata, workbase_t *wb)
{
HASH_ADD(hh, sdata->remote_workbases, id, sizeof(int64_t) * 2, wb);
}
static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata)
{
workbase_t *wb, *tmp;
@ -1886,7 +1900,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata)
/* Readd it to the hashlist */
ck_wlock(&sdata->workbase_lock);
HASH_ADD_I64(sdata->remote_workbases, id, wb);
__add_to_remote_workbases(sdata, wb);
}
ck_wunlock(&sdata->workbase_lock);
@ -1927,7 +1941,7 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
ck_wlock(&sdata->workbase_lock);
}
}
HASH_ADD_I64(sdata->remote_workbases, id, wb);
__add_to_remote_workbases(sdata, wb);
ck_wunlock(&sdata->workbase_lock);
val = generate_workinfo(ckp, wb, __func__);
@ -2239,12 +2253,21 @@ static workbase_t *get_workbase(sdata_t *sdata, const int64_t id)
return wb;
}
static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id)
static workbase_t *__find_remote_workbase(sdata_t *sdata, const int64_t id, const int64_t client_id)
{
int64_t lookup[2] = {id, client_id};
workbase_t *wb;
HASH_FIND(hh, sdata->remote_workbases, lookup, sizeof(int64_t) * 2, wb);
return wb;
}
static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id, const int64_t client_id)
{
workbase_t *wb;
ck_wlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->remote_workbases, &id, wb);
wb = __find_remote_workbase(sdata, id, client_id);
if (wb) {
if (wb->incomplete)
wb = NULL;
@ -3458,7 +3481,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata)
return client;
}
/* Enter with write instance_lock held */
/* Enter with write instance_lock held, drops and grabs it again */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, const char *address,
int server)
{
@ -3467,6 +3490,8 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con
int64_t pass_id;
client = __recruit_stratum_instance(sdata);
ck_wunlock(&sdata->instance_lock);
client->start_time = time(NULL);
client->id = id;
client->session_id = ++sdata->session_id;
@ -3478,7 +3503,6 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con
client->diff = client->old_diff = ckp->startdiff;
client->ckp = ckp;
tv_time(&client->ldc);
HASH_ADD_I64(sdata->stratum_instances, id, client);
/* Points to ckp sdata in ckpool mode, but is changed later in proxy
* mode . */
client->sdata = sdata;
@ -3499,8 +3523,14 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con
sprintf(client->identity, "passthrough:%"PRId64" subclient:%"PRId64,
pass_id, id);
}
} else
client->virtualid = connector_newclientid(ckp);
} else {
sprintf(client->identity, "%"PRId64, id);
client->virtualid = id;
}
ck_wlock(&sdata->instance_lock);
HASH_ADD_I64(sdata->stratum_instances, id, client);
return client;
}
@ -3809,7 +3839,7 @@ static json_t *user_stats(const user_instance_t *user)
}
/* Adjust workinfo id to virtual value for remote trusted workinfos */
static void remap_workinfo_id(sdata_t *sdata, json_t *val)
static void remap_workinfo_id(sdata_t *sdata, json_t *val, const int64_t client_id)
{
int64_t mapped_id, id;
workbase_t *wb;
@ -3817,7 +3847,7 @@ static void remap_workinfo_id(sdata_t *sdata, json_t *val)
json_get_int64(&id, val, "workinfoid");
ck_rlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->remote_workbases, &id, wb);
wb = __find_remote_workbase(sdata, id, client_id);
if (likely(wb))
mapped_id = wb->mapped_id;
else
@ -4583,8 +4613,6 @@ static void get_uptime(sdata_t *sdata, int *sockd)
send_api_response(val, *sockd);
}
static void srecv_process(ckpool_t *ckp, json_t *val);
/* For emergency use only, flushes all pending ckdbq messages */
static void ckdbq_flush(sdata_t *sdata)
{
@ -6049,23 +6077,28 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc
send_node_block(sdata, client->enonce1, nonce, nonce2, ntime32, wb->id,
diff, client->id);
JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}",
"height", wb->height,
"blockhash", blockhash,
"confirmed", "n",
"workinfoid", wb->id,
"username", client->user_instance->username,
"workername", client->workername,
"clientid", client->id,
"enonce1", client->enonce1,
"nonce2", nonce2,
"nonce", nonce,
"reward", wb->coinbasevalue,
"diff", diff,
"createdate", cdfield,
"createby", "code",
"createcode", __func__,
"createinet", ckp->serverurl[client->server]);
val = json_object();
// JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}",
json_set_int(val, "height", wb->height);
json_set_string(val,"blockhash", blockhash);
json_set_string(val,"confirmed", "n");
json_set_int64(val, "workinfoid", wb->id);
json_set_string(val, "username", client->user_instance->username);
json_set_string(val, "workername", client->workername);
if (ckp->remote)
json_set_int64(val, "clientid", client->virtualid);
else
json_set_int64(val, "clientid", client->id);
json_set_string(val, "enonce1", client->enonce1);
json_set_string(val, "nonce2", nonce2);
json_set_string(val, "nonce", nonce);
json_set_int64(val, "reward", wb->coinbasevalue);
json_set_double(val, "diff", diff);
json_set_string(val, "createdate", cdfield);
json_set_string(val, "createby", "code");
json_set_string(val, "createcode", __func__);
json_set_string(val, "createinet", ckp->serverurl[client->server]);
val_copy = json_deep_copy(val);
if (ckp->remote) {
@ -6449,7 +6482,10 @@ out_nowb:
/* Now write to the pool's sharelog. */
val = json_object();
json_set_int(val, "workinfoid", id);
json_set_int(val, "clientid", client->id);
if (ckp->remote)
json_set_int64(val, "clientid", client->virtualid);
else
json_set_int64(val, "clientid", client->id);
json_set_string(val, "enonce1", client->enonce1);
if (!CKP_STANDALONE(ckp))
json_set_string(val, "secondaryuserid", user->secondaryuserid);
@ -6519,7 +6555,10 @@ out:
if (!share) {
if (!CKP_STANDALONE(ckp) || ckp->remote) {
val = json_object();
json_set_int(val, "clientid", client->id);
if (ckp->remote)
json_set_int64(val, "clientid", client->virtualid);
else
json_set_int64(val, "clientid", client->id);
if (user->secondaryuserid)
json_set_string(val, "secondaryuserid", user->secondaryuserid);
json_set_string(val, "enonce1", client->enonce1);
@ -6893,13 +6932,13 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
if (unlikely(cmdmatch(method, "mining.passthrough"))) {
char buf[256];
if (ckp->proxy || ckp->node || ckp->remote) {
if (ckp->proxy || ckp->node ) {
LOGNOTICE("Dropping client %s %s trying to connect as passthrough on unsupported server %d",
client->identity, client->address, client->server);
connector_drop_client(ckp, client_id);
drop_client(ckp, sdata, client_id);
} else {
/* We need is a passthrough and to manage its messages
/*Flag this as a passthrough and manage its messages
* accordingly. No data from this client id should ever
* come directly back to this stratifier. */
LOGNOTICE("Adding passthrough client %s %s", client->identity, client->address);
@ -7096,7 +7135,8 @@ static user_instance_t *generate_remote_user(ckpool_t *ckp, const char *workerna
return user;
}
static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf)
static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf,
const int64_t client_id)
{
json_t *workername_val = json_object_get(val, "workername");
worker_instance_t *worker;
@ -7144,12 +7184,13 @@ static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
val = json_deep_copy(val);
if (likely(user->secondaryuserid))
json_set_string(val, "secondaryuserid", user->secondaryuserid);
remap_workinfo_id(sdata, val);
remap_workinfo_id(sdata, val, client_id);
ckdbq_add(ckp, ID_SHARES, val);
}
static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf)
static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf,
const int64_t client_id)
{
user_instance_t *user = NULL;
const char *workername;
@ -7167,7 +7208,7 @@ static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, co
val = json_deep_copy(val);
if (likely(user->secondaryuserid))
json_set_string(val, "secondaryuserid", user->secondaryuserid);
remap_workinfo_id(sdata, val);
remap_workinfo_id(sdata, val, client_id);
ckdbq_add(ckp, ID_SHAREERR, val);
}
@ -7198,6 +7239,31 @@ static void send_auth_failure(sdata_t *sdata, stratum_instance_t *client)
stratum_send_message(sdata, client, "Failed authorisation :(");
}
/* For finding a client by its virtualid instead of client->id. This is an
* inefficient lookup but only occurs once on parsing a remote auth from the
* upstream pool on passthrough subclients. */
static stratum_instance_t *ref_instance_by_virtualid(sdata_t *sdata, int64_t *client_id)
{
stratum_instance_t *client, *ret = NULL;
ck_wlock(&sdata->instance_lock);
for (client = sdata->stratum_instances; client; client = client->hh.next) {
if (likely(client->virtualid != *client_id))
continue;
if (likely(!client->dropped)) {
ret = client;
__inc_instance_ref(ret);
/* Replace the client_id with the correct one, allowing
* us to send the response to the correct client */
*client_id = client->id;
}
break;
}
ck_wunlock(&sdata->instance_lock);
return ret;
}
void parse_upstream_auth(ckpool_t *ckp, json_t *val)
{
json_t *id_val = NULL, *err_val = NULL;
@ -7215,6 +7281,9 @@ void parse_upstream_auth(ckpool_t *ckp, json_t *val)
goto out;
err_val = json_object_get(val, "error");
client = ref_instance_by_id(sdata, client_id);
/* Is this client_id a virtualid from a passthrough subclient */
if (!client)
client = ref_instance_by_virtualid(sdata, &client_id);
if (!client) {
LOGINFO("Failed to find client id %"PRId64" in parse_upstream_auth",
client_id);
@ -7345,7 +7414,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
json_get_double(&diff, val, "diff");
if (likely(id && coinbasehex && swaphex && cblen))
wb = get_remote_workbase(sdata, id);
wb = get_remote_workbase(sdata, id, client_id);
if (unlikely(!wb))
LOGWARNING("Inadequate data locally to attempt submit of remote block");
@ -7360,9 +7429,11 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
sha256(swap, 80, hash1);
sha256(hash1, 32, hash);
gbt_block = process_block(wb, coinbase, cblen, swap, hash, flip32, blockhash);
/* Ignore the return value of local_block_submit here as we rely
* on the remote server to give us the ID_BLOCK responses */
local_block_submit(ckp, gbt_block, flip32, wb->height);
/* We rely on the remote server to give us the ID_BLOCK
* responses, so only use this response to determine if we
* should reset the best shares. */
if (local_block_submit(ckp, gbt_block, flip32, wb->height))
reset_bestshares(sdata);
put_remote_workbase(sdata, wb);
}
@ -7380,7 +7451,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
out_add:
/* Make a duplicate for use by ckdbq_add */
val = json_deep_copy(val);
remap_workinfo_id(sdata, val);
remap_workinfo_id(sdata, val, client_id);
if (!ckp->remote)
downstream_json(sdata, val, client_id, SSEND_PREPEND);
@ -7534,7 +7605,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu
}
if (likely(!safecmp(method, stratum_msgs[SM_SHARE])))
parse_remote_share(ckp, sdata, val, buf);
parse_remote_share(ckp, sdata, val, buf, client->id);
else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS]))
add_node_txns(ckp, sdata, val);
else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS]))
@ -7544,7 +7615,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu
else if (!safecmp(method, stratum_msgs[SM_AUTH]))
parse_remote_auth(ckp, sdata, val, client, client->id);
else if (!safecmp(method, stratum_msgs[SM_SHAREERR]))
parse_remote_shareerr(ckp, sdata, val, buf);
parse_remote_shareerr(ckp, sdata, val, buf, client->id);
else if (!safecmp(method, stratum_msgs[SM_BLOCK]))
parse_remote_block(ckp, sdata, val, buf, client->id);
else if (!safecmp(method, stratum_msgs[SM_REQTXNS]))
@ -7884,7 +7955,7 @@ static void upstream_auth(ckpool_t *ckp, stratum_instance_t *client, json_params
json_set_string(val, "useragent", client->useragent ? : "");
json_set_string(val, "enonce1", client->enonce1 ? : "");
json_set_string(val, "address", client->address);
json_set_int(val, "clientid", client->id);
json_set_int64(val, "clientid", client->virtualid);
msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL);
json_decref(val);
connector_upstream_msg(ckp, msg);