日期:2014-05-16 浏览次数:20495 次
// 读取操作 Response call( OutMessage msg , DBCollection coll ) throws IOException { return go( msg , coll ); } // 写入操作 void say( OutMessage msg ) throws IOException { go( msg , null ); } // 执行操作 private synchronized Response go( OutMessage msg , DBCollection coll ) throws IOException { return go( msg , coll , false ); } // 执行操作 private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse ) throws IOException { // 正在处理请求 if ( _processingResponse ){ if ( coll == null ){ // this could be a pipeline and should be safe } else { // this could cause issues since we're reading data off the wire throw new IllegalStateException( "DBPort.go called and expecting a response while processing another response" ); } } // 增加调用次数计数 _calls++; // _sorket 为空,打开连接 if ( _socket == null ) _open(); if ( _out == null ) throw new IllegalStateException( "_out shouldn't be null" ); try { // 准备消息 msg.prepare(); // 输出 msg.pipe( _out ); if ( _pool != null ) _pool._everWorked = true; if ( coll == null && ! forceReponse ) return null; _processingResponse = true; // 返回结果 return new Response( _sa , coll , _in , _decoder); } catch ( IOException ioe ){ close(); throw ioe; } finally { _processingResponse = false; } }
// 打开连接 boolean _open() throws IOException { long sleepTime = 100; final long start = System.currentTimeMillis(); while ( true ){ IOException lastError = null; try { // 创建 socket 并连接 _socket = new Socket(); _socket.connect( _addr , _options.connectTimeout ); // 设置 socket 参数 _socket.setTcpNoDelay( ! USE_NAGLE ); _socket.setKeepAlive( _options.socketKeepAlive ); _socket.setSoTimeout( _options.socketTimeout ); // 获取输入输出流 _in = new BufferedInputStream( _socket.getInputStream() ); _out = _socket.getOutputStream(); return true; } catch ( IOException ioe ){ // ... } if ( ! _options.autoConnectRetry || ( _pool != null && ! _pool._everWorked ) ) throw lastError; // 超时处理 long sleptSoFar = System.currentTimeMillis() - start; if ( sleptSoFar >= CONN_RETRY_TIME_MS ) throw lastError; if ( sleepTime + sleptSoFar > CONN_RETRY_TIME_MS ) sleepTime = CONN_RETRY_TIME_MS - sleptSoFar; // 等待重试 _logger.severe( "going to sleep and retry. total sleep time after = " + ( sleptSoFar + sleptSoFar ) + "ms this time:" + sleepTime + "ms" ); ThreadUtil.sleep( sleepTime ); sleepTime *= 2; } }