diff --git a/src/connector.c b/src/connector.c index d329114f..f0272999 100644 --- a/src/connector.c +++ b/src/connector.c @@ -137,7 +137,7 @@ struct connector_data { int clients_generated; int dead_generated; - int64_t client_id; + int64_t client_ids; /* client message process queue */ ckmsgq_t *cmpq; @@ -246,6 +246,20 @@ static void recycle_client(cdata_t *cdata, client_instance_t *client) ck_wunlock(&cdata->lock); } +/* Allows the stratifier to get a unique local virtualid for subclients */ +int64_t connector_newclientid(ckpool_t *ckp) +{ + int64_t ret; + + cdata_t *cdata = ckp->cdata; + + ck_wlock(&cdata->lock); + ret = cdata->client_ids++; + ck_wunlock(&cdata->lock); + + return ret; +} + /* Accepts incoming connections on the server socket and generates client * instances */ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) @@ -313,7 +327,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds, fd, no_clients, client->address_name, port); ck_wlock(&cdata->lock); - client->id = cdata->client_id++; + client->id = cdata->client_ids++; HASH_ADD_I64(cdata->clients, id, client); cdata->nfds++; ck_wunlock(&cdata->lock); @@ -1657,7 +1671,7 @@ void *connector(void *arg) cdata->nfds = 0; /* Set the client id to the highest serverurl count to distinguish * them from the server fds in epoll. */ - cdata->client_id = ckp->serverurls; + cdata->client_ids = ckp->serverurls; mutex_init(&cdata->sender_lock); cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); diff --git a/src/connector.h b/src/connector.h index 4bb79bd0..be945ef3 100644 --- a/src/connector.h +++ b/src/connector.h @@ -10,6 +10,7 @@ #ifndef CONNECTOR_H #define CONNECTOR_H +int64_t connector_newclientid(ckpool_t *ckp); void connector_upstream_msg(ckpool_t *ckp, char *msg); void connector_add_message(ckpool_t *ckp, json_t *val); char *connector_stats(void *data, const int runtime); diff --git a/src/stratifier.c b/src/stratifier.c index 934a108c..b21b59a6 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -85,7 +85,13 @@ typedef struct pool_stats pool_stats_t; struct workbase { /* Hash table data */ UT_hash_handle hh; + + /* The next two fields need to be consecutive as both of them are + * used as the key for their hashtable entry in remote_workbases */ int64_t id; + /* The client id this workinfo came from if remote */ + int64_t client_id; + char idstring[20]; /* How many readers we currently have of this workbase, set @@ -94,8 +100,6 @@ struct workbase { /* The id a remote workinfo is mapped to locally */ int64_t mapped_id; - /* The client id this remote workinfo came from */ - int64_t client_id; ts_t gentime; tv_t retired; @@ -267,6 +271,9 @@ struct stratum_instance { UT_hash_handle hh; int64_t id; + /* Virtualid used as unique local id for passthrough clients */ + int64_t virtualid; + stratum_instance_t *next; stratum_instance_t *prev; @@ -1188,8 +1195,8 @@ static void generate_userwbs(sdata_t *sdata, workbase_t *wb) * pool mode but unique to each subproxy in proxy mode */ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_block) { - workbase_t *tmp, *tmpa, *aged = NULL; sdata_t *ckp_sdata = ckp->sdata; + workbase_t *tmp, *tmpa; int len, ret; ts_realtime(&wb->gentime); @@ -1227,6 +1234,11 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl if (ckp->logshares) sprintf(wb->logdir, "%s%08x/%s", ckp->logdir, wb->height, wb->idstring); + HASH_ADD_I64(sdata->workbases, id, wb); + if (sdata->current_workbase) + tv_time(&sdata->current_workbase->retired); + sdata->current_workbase = wb; + /* Is this long enough to ensure we don't dereference a workbase * immediately? Should be unless clock changes 10 minutes so we use * ts_realtime */ @@ -1235,19 +1247,21 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl break; if (wb == tmp) continue; - if (wb->readcount) + if (tmp->readcount) continue; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { HASH_DEL(sdata->workbases, tmp); - aged = tmp; - break; + ck_wunlock(&sdata->workbase_lock); + + /* Drop lock to avoid recursive locks */ + send_ageworkinfo(ckp, tmp->id); + age_share_hashtable(sdata, tmp->id); + clear_workbase(ckp, tmp); + + ck_wlock(&sdata->workbase_lock); } } - HASH_ADD_I64(sdata->workbases, id, wb); - if (sdata->current_workbase) - tv_time(&sdata->current_workbase->retired); - sdata->current_workbase = wb; ck_wunlock(&sdata->workbase_lock); /* This wb can't be pulled out from under us so no workbase lock is @@ -1260,14 +1274,6 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl if (!ckp->passthrough) send_workinfo(ckp, sdata, wb); - - /* Send the aged work message to ckdb once we have dropped the workbase lock - * to prevent taking recursive locks */ - if (aged) { - send_ageworkinfo(ckp, aged->id); - age_share_hashtable(sdata, aged->id); - clear_workbase(ckp, aged); - } } static void broadcast_ping(sdata_t *sdata); @@ -1865,6 +1871,14 @@ out: return ret; } +/* Remote workbases are keyed by the combined values of wb->id and + * wb->client_id to prevent collisions in the unlikely event two remote + * servers are generating the same workbase ids. */ +static void __add_to_remote_workbases(sdata_t *sdata, workbase_t *wb) +{ + HASH_ADD(hh, sdata->remote_workbases, id, sizeof(int64_t) * 2, wb); +} + static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata) { workbase_t *wb, *tmp; @@ -1886,7 +1900,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata) /* Readd it to the hashlist */ ck_wlock(&sdata->workbase_lock); - HASH_ADD_I64(sdata->remote_workbases, id, wb); + __add_to_remote_workbases(sdata, wb); } ck_wunlock(&sdata->workbase_lock); @@ -1927,7 +1941,7 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb) ck_wlock(&sdata->workbase_lock); } } - HASH_ADD_I64(sdata->remote_workbases, id, wb); + __add_to_remote_workbases(sdata, wb); ck_wunlock(&sdata->workbase_lock); val = generate_workinfo(ckp, wb, __func__); @@ -2239,12 +2253,21 @@ static workbase_t *get_workbase(sdata_t *sdata, const int64_t id) return wb; } -static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id) +static workbase_t *__find_remote_workbase(sdata_t *sdata, const int64_t id, const int64_t client_id) +{ + int64_t lookup[2] = {id, client_id}; + workbase_t *wb; + + HASH_FIND(hh, sdata->remote_workbases, lookup, sizeof(int64_t) * 2, wb); + return wb; +} + +static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id, const int64_t client_id) { workbase_t *wb; ck_wlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->remote_workbases, &id, wb); + wb = __find_remote_workbase(sdata, id, client_id); if (wb) { if (wb->incomplete) wb = NULL; @@ -3458,7 +3481,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata) return client; } -/* Enter with write instance_lock held */ +/* Enter with write instance_lock held, drops and grabs it again */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, const char *address, int server) { @@ -3467,6 +3490,8 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con int64_t pass_id; client = __recruit_stratum_instance(sdata); + ck_wunlock(&sdata->instance_lock); + client->start_time = time(NULL); client->id = id; client->session_id = ++sdata->session_id; @@ -3478,7 +3503,6 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con client->diff = client->old_diff = ckp->startdiff; client->ckp = ckp; tv_time(&client->ldc); - HASH_ADD_I64(sdata->stratum_instances, id, client); /* Points to ckp sdata in ckpool mode, but is changed later in proxy * mode . */ client->sdata = sdata; @@ -3499,8 +3523,14 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, con sprintf(client->identity, "passthrough:%"PRId64" subclient:%"PRId64, pass_id, id); } - } else + client->virtualid = connector_newclientid(ckp); + } else { sprintf(client->identity, "%"PRId64, id); + client->virtualid = id; + } + + ck_wlock(&sdata->instance_lock); + HASH_ADD_I64(sdata->stratum_instances, id, client); return client; } @@ -3809,7 +3839,7 @@ static json_t *user_stats(const user_instance_t *user) } /* Adjust workinfo id to virtual value for remote trusted workinfos */ -static void remap_workinfo_id(sdata_t *sdata, json_t *val) +static void remap_workinfo_id(sdata_t *sdata, json_t *val, const int64_t client_id) { int64_t mapped_id, id; workbase_t *wb; @@ -3817,7 +3847,7 @@ static void remap_workinfo_id(sdata_t *sdata, json_t *val) json_get_int64(&id, val, "workinfoid"); ck_rlock(&sdata->workbase_lock); - HASH_FIND_I64(sdata->remote_workbases, &id, wb); + wb = __find_remote_workbase(sdata, id, client_id); if (likely(wb)) mapped_id = wb->mapped_id; else @@ -4583,8 +4613,6 @@ static void get_uptime(sdata_t *sdata, int *sockd) send_api_response(val, *sockd); } -static void srecv_process(ckpool_t *ckp, json_t *val); - /* For emergency use only, flushes all pending ckdbq messages */ static void ckdbq_flush(sdata_t *sdata) { @@ -6049,23 +6077,28 @@ test_blocksolve(const stratum_instance_t *client, const workbase_t *wb, const uc send_node_block(sdata, client->enonce1, nonce, nonce2, ntime32, wb->id, diff, client->id); - JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}", - "height", wb->height, - "blockhash", blockhash, - "confirmed", "n", - "workinfoid", wb->id, - "username", client->user_instance->username, - "workername", client->workername, - "clientid", client->id, - "enonce1", client->enonce1, - "nonce2", nonce2, - "nonce", nonce, - "reward", wb->coinbasevalue, - "diff", diff, - "createdate", cdfield, - "createby", "code", - "createcode", __func__, - "createinet", ckp->serverurl[client->server]); + val = json_object(); + // JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,sf,ss,ss,ss,ss}", + json_set_int(val, "height", wb->height); + json_set_string(val,"blockhash", blockhash); + json_set_string(val,"confirmed", "n"); + json_set_int64(val, "workinfoid", wb->id); + json_set_string(val, "username", client->user_instance->username); + json_set_string(val, "workername", client->workername); + if (ckp->remote) + json_set_int64(val, "clientid", client->virtualid); + else + json_set_int64(val, "clientid", client->id); + json_set_string(val, "enonce1", client->enonce1); + json_set_string(val, "nonce2", nonce2); + json_set_string(val, "nonce", nonce); + json_set_int64(val, "reward", wb->coinbasevalue); + json_set_double(val, "diff", diff); + json_set_string(val, "createdate", cdfield); + json_set_string(val, "createby", "code"); + json_set_string(val, "createcode", __func__); + json_set_string(val, "createinet", ckp->serverurl[client->server]); + val_copy = json_deep_copy(val); if (ckp->remote) { @@ -6449,7 +6482,10 @@ out_nowb: /* Now write to the pool's sharelog. */ val = json_object(); json_set_int(val, "workinfoid", id); - json_set_int(val, "clientid", client->id); + if (ckp->remote) + json_set_int64(val, "clientid", client->virtualid); + else + json_set_int64(val, "clientid", client->id); json_set_string(val, "enonce1", client->enonce1); if (!CKP_STANDALONE(ckp)) json_set_string(val, "secondaryuserid", user->secondaryuserid); @@ -6519,7 +6555,10 @@ out: if (!share) { if (!CKP_STANDALONE(ckp) || ckp->remote) { val = json_object(); - json_set_int(val, "clientid", client->id); + if (ckp->remote) + json_set_int64(val, "clientid", client->virtualid); + else + json_set_int64(val, "clientid", client->id); if (user->secondaryuserid) json_set_string(val, "secondaryuserid", user->secondaryuserid); json_set_string(val, "enonce1", client->enonce1); @@ -6893,13 +6932,13 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie if (unlikely(cmdmatch(method, "mining.passthrough"))) { char buf[256]; - if (ckp->proxy || ckp->node || ckp->remote) { + if (ckp->proxy || ckp->node ) { LOGNOTICE("Dropping client %s %s trying to connect as passthrough on unsupported server %d", client->identity, client->address, client->server); connector_drop_client(ckp, client_id); drop_client(ckp, sdata, client_id); } else { - /* We need is a passthrough and to manage its messages + /*Flag this as a passthrough and manage its messages * accordingly. No data from this client id should ever * come directly back to this stratifier. */ LOGNOTICE("Adding passthrough client %s %s", client->identity, client->address); @@ -7096,7 +7135,8 @@ static user_instance_t *generate_remote_user(ckpool_t *ckp, const char *workerna return user; } -static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) +static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf, + const int64_t client_id) { json_t *workername_val = json_object_get(val, "workername"); worker_instance_t *worker; @@ -7144,12 +7184,13 @@ static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const val = json_deep_copy(val); if (likely(user->secondaryuserid)) json_set_string(val, "secondaryuserid", user->secondaryuserid); - remap_workinfo_id(sdata, val); + remap_workinfo_id(sdata, val, client_id); ckdbq_add(ckp, ID_SHARES, val); } -static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) +static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf, + const int64_t client_id) { user_instance_t *user = NULL; const char *workername; @@ -7167,7 +7208,7 @@ static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, co val = json_deep_copy(val); if (likely(user->secondaryuserid)) json_set_string(val, "secondaryuserid", user->secondaryuserid); - remap_workinfo_id(sdata, val); + remap_workinfo_id(sdata, val, client_id); ckdbq_add(ckp, ID_SHAREERR, val); } @@ -7198,6 +7239,31 @@ static void send_auth_failure(sdata_t *sdata, stratum_instance_t *client) stratum_send_message(sdata, client, "Failed authorisation :("); } +/* For finding a client by its virtualid instead of client->id. This is an + * inefficient lookup but only occurs once on parsing a remote auth from the + * upstream pool on passthrough subclients. */ +static stratum_instance_t *ref_instance_by_virtualid(sdata_t *sdata, int64_t *client_id) +{ + stratum_instance_t *client, *ret = NULL; + + ck_wlock(&sdata->instance_lock); + for (client = sdata->stratum_instances; client; client = client->hh.next) { + if (likely(client->virtualid != *client_id)) + continue; + if (likely(!client->dropped)) { + ret = client; + __inc_instance_ref(ret); + /* Replace the client_id with the correct one, allowing + * us to send the response to the correct client */ + *client_id = client->id; + } + break; + } + ck_wunlock(&sdata->instance_lock); + + return ret; +} + void parse_upstream_auth(ckpool_t *ckp, json_t *val) { json_t *id_val = NULL, *err_val = NULL; @@ -7215,6 +7281,9 @@ void parse_upstream_auth(ckpool_t *ckp, json_t *val) goto out; err_val = json_object_get(val, "error"); client = ref_instance_by_id(sdata, client_id); + /* Is this client_id a virtualid from a passthrough subclient */ + if (!client) + client = ref_instance_by_virtualid(sdata, &client_id); if (!client) { LOGINFO("Failed to find client id %"PRId64" in parse_upstream_auth", client_id); @@ -7345,7 +7414,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const json_get_double(&diff, val, "diff"); if (likely(id && coinbasehex && swaphex && cblen)) - wb = get_remote_workbase(sdata, id); + wb = get_remote_workbase(sdata, id, client_id); if (unlikely(!wb)) LOGWARNING("Inadequate data locally to attempt submit of remote block"); @@ -7360,9 +7429,11 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const sha256(swap, 80, hash1); sha256(hash1, 32, hash); gbt_block = process_block(wb, coinbase, cblen, swap, hash, flip32, blockhash); - /* Ignore the return value of local_block_submit here as we rely - * on the remote server to give us the ID_BLOCK responses */ - local_block_submit(ckp, gbt_block, flip32, wb->height); + /* We rely on the remote server to give us the ID_BLOCK + * responses, so only use this response to determine if we + * should reset the best shares. */ + if (local_block_submit(ckp, gbt_block, flip32, wb->height)) + reset_bestshares(sdata); put_remote_workbase(sdata, wb); } @@ -7380,7 +7451,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const out_add: /* Make a duplicate for use by ckdbq_add */ val = json_deep_copy(val); - remap_workinfo_id(sdata, val); + remap_workinfo_id(sdata, val, client_id); if (!ckp->remote) downstream_json(sdata, val, client_id, SSEND_PREPEND); @@ -7534,7 +7605,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu } if (likely(!safecmp(method, stratum_msgs[SM_SHARE]))) - parse_remote_share(ckp, sdata, val, buf); + parse_remote_share(ckp, sdata, val, buf, client->id); else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS])) add_node_txns(ckp, sdata, val); else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS])) @@ -7544,7 +7615,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu else if (!safecmp(method, stratum_msgs[SM_AUTH])) parse_remote_auth(ckp, sdata, val, client, client->id); else if (!safecmp(method, stratum_msgs[SM_SHAREERR])) - parse_remote_shareerr(ckp, sdata, val, buf); + parse_remote_shareerr(ckp, sdata, val, buf, client->id); else if (!safecmp(method, stratum_msgs[SM_BLOCK])) parse_remote_block(ckp, sdata, val, buf, client->id); else if (!safecmp(method, stratum_msgs[SM_REQTXNS])) @@ -7884,7 +7955,7 @@ static void upstream_auth(ckpool_t *ckp, stratum_instance_t *client, json_params json_set_string(val, "useragent", client->useragent ? : ""); json_set_string(val, "enonce1", client->enonce1 ? : ""); json_set_string(val, "address", client->address); - json_set_int(val, "clientid", client->id); + json_set_int64(val, "clientid", client->virtualid); msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER | JSON_COMPACT | JSON_EOL); json_decref(val); connector_upstream_msg(ckp, msg);