2005-02-24 jrandom

* Cache temporary memory allocation in the DSA's SHA1 impl, and the packet
      data in the streaming lib.
    * Fixed a streaming lib bug where the connection initiator would fail the
      stream if the ACK to their SYN was lost.
This commit is contained in:
jrandom
2005-02-24 18:05:25 +00:00
committed by zzz
parent f61618e4a4
commit 00f27d4400
18 changed files with 428 additions and 237 deletions
@@ -13,6 +13,7 @@ import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
/**
@@ -52,6 +53,7 @@ public class MessageInputStream extends InputStream {
private int _readTimeout;
private IOException _streamError;
private long _readTotal;
private ByteCache _cache;
private byte[] _oneByte = new byte[1];
@@ -70,6 +72,7 @@ public class MessageInputStream extends InputStream {
_dataLock = new Object();
_closeReceived = false;
_locallyClosed = false;
_cache = ByteCache.getInstance(128, Packet.MAX_PAYLOAD_SIZE);
}
/** What is the highest block ID we've completely received through? */
@@ -166,7 +169,7 @@ public class MessageInputStream extends InputStream {
buf.append("Close received, ready bytes: ");
long available = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++)
available += ((ByteArray)_readyDataBlocks.get(i)).getData().length;
available += ((ByteArray)_readyDataBlocks.get(i)).getValid();
available -= _readyDataBlockIndex;
buf.append(available);
buf.append(" blocks: ").append(_readyDataBlocks.size());
@@ -178,8 +181,8 @@ public class MessageInputStream extends InputStream {
ByteArray ba = (ByteArray)_notYetReadyBlocks.get(id);
buf.append(id).append(" ");
if (ba.getData() != null)
notAvailable += ba.getData().length;
if (ba != null)
notAvailable += ba.getValid();
}
buf.append("not ready bytes: ").append(notAvailable);
@@ -198,10 +201,10 @@ public class MessageInputStream extends InputStream {
*
* @return true if this is a new packet, false if it is a dup
*/
public boolean messageReceived(long messageId, byte payload[]) {
public boolean messageReceived(long messageId, ByteArray payload) {
synchronized (_dataLock) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("received " + messageId + " with " + payload.length);
_log.debug("received " + messageId + " with " + payload.getValid());
if (messageId <= _highestReadyBlockId) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ignoring dup message " + messageId);
@@ -212,17 +215,17 @@ public class MessageInputStream extends InputStream {
_highestBlockId = messageId;
if (_highestReadyBlockId + 1 == messageId) {
if (!_locallyClosed && payload.length > 0) {
if (!_locallyClosed && payload.getValid() > 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("accepting bytes as ready: " + payload.length);
_readyDataBlocks.add(new ByteArray(payload));
_log.debug("accepting bytes as ready: " + payload.getValid());
_readyDataBlocks.add(payload);
}
_highestReadyBlockId = messageId;
long cur = _highestReadyBlockId + 1;
// now pull in any previously pending blocks
while (_notYetReadyBlocks.containsKey(new Long(cur))) {
ByteArray ba = (ByteArray)_notYetReadyBlocks.remove(new Long(cur));
if ( (ba != null) && (ba.getData() != null) && (ba.getData().length > 0) ) {
if ( (ba != null) && (ba.getData() != null) && (ba.getValid() > 0) ) {
_readyDataBlocks.add(ba);
}
@@ -238,7 +241,7 @@ public class MessageInputStream extends InputStream {
if (_locallyClosed) // dont need the payload, just the msgId in order
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(null));
else
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload));
_notYetReadyBlocks.put(new Long(messageId), payload);
_dataLock.notifyAll();
}
}
@@ -324,21 +327,25 @@ public class MessageInputStream extends InputStream {
} else {
// either was already ready, or we wait()ed and it arrived
ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
byte rv = cur.getData()[_readyDataBlockIndex];
byte rv = cur.getData()[cur.getOffset()+_readyDataBlockIndex];
_readyDataBlockIndex++;
if (cur.getData().length <= _readyDataBlockIndex) {
boolean removed = false;
if (cur.getValid() <= _readyDataBlockIndex) {
_readyDataBlockIndex = 0;
_readyDataBlocks.remove(0);
removed = true;
}
_readTotal++;
target[offset + i] = rv; // rv < 0 ? rv + 256 : rv
if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getData().length - 5) ) {
if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getValid() - 5) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex
+ " readyBlocks=" + _readyDataBlocks.size()
+ " readTotal=" + _readTotal);
}
if (removed)
_cache.release(cur);
}
} // for (int i = 0; i < length; i++) {
} // synchronized (_dataLock)
@@ -357,9 +364,9 @@ public class MessageInputStream extends InputStream {
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getData().length - _readyDataBlockIndex;
numBytes += cur.getValid() - _readyDataBlockIndex;
else
numBytes += cur.getData().length;
numBytes += cur.getValid();
}
}
if (_log.shouldLog(Log.DEBUG))
@@ -380,13 +387,13 @@ public class MessageInputStream extends InputStream {
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getData().length - _readyDataBlockIndex;
numBytes += cur.getValid() - _readyDataBlockIndex;
else
numBytes += cur.getData().length;
numBytes += cur.getValid();
}
for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
ByteArray cur = (ByteArray)iter.next();
numBytes += cur.getData().length;
numBytes += cur.getValid();
}
return numBytes;
}
@@ -399,9 +406,9 @@ public class MessageInputStream extends InputStream {
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getData().length - _readyDataBlockIndex;
numBytes += cur.getValid() - _readyDataBlockIndex;
else
numBytes += cur.getData().length;
numBytes += cur.getValid();
}
return numBytes;
}
@@ -409,13 +416,15 @@ public class MessageInputStream extends InputStream {
public void close() {
synchronized (_dataLock) {
_readyDataBlocks.clear();
while (_readyDataBlocks.size() > 0)
_cache.release((ByteArray)_readyDataBlocks.remove(0));
// we don't need the data, but we do need to keep track of the messageIds
// received, so we can ACK accordingly
for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
ByteArray ba = (ByteArray)iter.next();
ba.setData(null);
//ba.setData(null);
_cache.release(ba);
}
_locallyClosed = true;
_dataLock.notifyAll();