Add electrum config for server version, ping strategy and reconnection

This commit is contained in:
Overtorment 2019-01-05 12:07:35 +00:00
parent d54b794583
commit 232e113b35
4 changed files with 116 additions and 47 deletions

View File

@ -1,42 +1,47 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const util = require('./util')
const initSocket = require('./init_socket')
const connectSocket = require('./connect_socket')
class Client{
constructor(port, host, protocol = 'tcp', options = void 0){
class Client {
constructor(port, host, protocol, options) {
this.id = 0;
this.port = port
this.host = host
this.callback_message_queue = {}
this.subscribe = new EventEmitter()
this.conn = initSocket(this, protocol, options)
this.port = port;
this.host = host;
this.callback_message_queue = {};
this.subscribe = new EventEmitter();
this.mp = new util.MessageParser((body, n) => {
this.onMessage(body, n)
})
this.status = 0
});
this.initSocketConnection(protocol, options);
}
connect(){
if(this.status) {
return Promise.resolve()
initSocketConnection(protocol = 'tcp', options = void 0) {
this.conn = initSocket(this, protocol, options);
this.status = 0;
}
connect() {
if (this.status === 1) {
return Promise.resolve();
}
this.status = 1
return connectSocket(this.conn, this.port, this.host)
this.status = 1;
return connectSocket(this.conn, this.port, this.host);
}
close(){
if(!this.status) {
return
close() {
if (this.status === 0) {
return ;
}
this.conn.end()
this.conn.destroy()
this.status = 0
}
request(method, params){
if(!this.status) {
request(method, params) {
if (this.status === 0) {
return Promise.reject(new Error('ESOCKET'))
}
return new Promise((resolve, reject) => {
@ -44,54 +49,59 @@ class Client{
const content = util.makeRequest(method, params, id);
this.callback_message_queue[id] = util.createPromiseResult(resolve, reject);
this.conn.write(content + '\n');
})
});
}
response(msg){
response(msg) {
const callback = this.callback_message_queue[msg.id]
if(callback){
if (callback) {
delete this.callback_message_queue[msg.id]
if(msg.error){
if (msg.error) {
callback(msg.error)
}else{
} else {
callback(null, msg.result)
}
}else{
} else {
console.log("Can't get callback");
; // can't get callback
}
}
onMessage(body, n){
onMessage(body, n) {
const msg = JSON.parse(body)
if(msg instanceof Array){
if (msg instanceof Array) {
; // don't support batch request
} else {
if(msg.id !== void 0){
if (msg.id !== void 0) {
this.response(msg)
}else{
} else {
this.subscribe.emit(msg.method, msg.params)
}
}
}
onConnect(){
onConnect() {
}
onClose(){
onClose(e) {
this.status = 0;
console.log("OnClose:" + e);
Object.keys(this.callback_message_queue).forEach((key) => {
this.callback_message_queue[key](new Error('close connect'))
delete this.callback_message_queue[key]
})
}
onRecv(chunk){
onRecv(chunk) {
this.mp.run(chunk)
}
onEnd(){
onEnd(e) {
console.log("OnEnd:" + e);
}
onError(e){
onError(e) {
console.log("OnError:" + e);
}
}

View File

@ -4,10 +4,10 @@ const connectSocket = (conn, port, host) => {
return new Promise((resolve, reject) => {
const errorHandler = (e) => reject(e)
conn.connect(port, host, () => {
conn.removeListener('error', errorHandler)
resolve()
})
conn.on('error', errorHandler)
conn.removeListener('error', errorHandler);
resolve();
});
conn.on('error', errorHandler);
})
}

View File

@ -1,20 +1,79 @@
'use strict'
const Client = require("./client")
class ElectrumClient extends Client{
constructor(port, host, protocol, options){
super(port, host, protocol, options)
class ElectrumClient extends Client {
constructor(port, host, protocol, options) {
super(port, host, protocol, options);
this.timeLastCall = 0;
}
onClose(){
initElectrum(electrumConfig, persistencePolicy = { maxRetry: 1000, callback: null }) {
this.persistencePolicy = persistencePolicy;
this.electrumConfig = electrumConfig;
this.timeLastCall = 0;
return this
.connect()
.then(() => this.server_version(this.electrumConfig.client, this.electrumConfig.version))
;
}
// Override parent
request(method, params) {
this.timeLastCall = new Date().getTime();
const parentPromise = super.request(method, params);
return parentPromise
.then((response) => {
this.keepAlive();
return response;
})
;
}
onClose() {
super.onClose()
const list = [
'server.peers.subscribe',
'blockchain.numblocks.subscribe',
'blockchain.headers.subscribe',
'blockchain.address.subscribe'
]
list.forEach(event => this.subscribe.removeAllListeners(event))
];
list.forEach(event => this.subscribe.removeAllListeners(event));
setTimeout(() => {
if (this.persistencePolicy != null && this.persistencePolicy.maxRetry > 0) {
this.reconnect();
this.persistencePolicy.maxRetry -= 1;
} else if (this.persistencePolicy != null && this.persistencePolicy.callback != null) {
this.persistencePolicy.callback();
} else if (this.persistencePolicy == null) {
this.reconnect();
}
}, 10000);
}
server_version(client_name, protocol_version){
return this.request('server.version', [client_name, protocol_version])
// ElectrumX persistancy
keepAlive() {
if (this.timeout != null) {
clearTimeout(this.timeout);
}
this.timeout = setTimeout(() => {
if (this.timeLastCall !== 0 &&
new Date().getTime() > this.timeLastCall + 10000/2) {
this.server_ping();
}
}, 10000);
}
reconnect() {
this.initSocketConnection();
return this.initElectrum(this.electrumConfig);
}
// ElectrumX API
server_version(client_name, protocol_version) {
return this.request('server.version', [client_name, protocol_version]);
}
server_banner(){
return this.request('server.banner', [])

View File

@ -4,7 +4,7 @@ const net = require('net');
const TIMEOUT = 10000
const getSocket = (protocol, options) => {
switch(protocol){
switch (protocol) {
case 'tcp':
return new net.Socket();
case 'tls':