mirror of
https://github.com/i2p/i2p.i2p.git
synced 2026-06-07 13:11:41 +00:00
very basic tests pass (ping, open then pause then close, open then echo back and forth a few times then close)
This commit is contained in:
@@ -11,7 +11,9 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.ByteArray;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Stream that can be given messages out of order
|
||||
@@ -19,6 +21,8 @@ import net.i2p.data.ByteArray;
|
||||
*
|
||||
*/
|
||||
public class MessageInputStream extends InputStream {
|
||||
private I2PAppContext _context;
|
||||
private Log _log;
|
||||
/**
|
||||
* List of ByteArray objects of data ready to be read,
|
||||
* with the first ByteArray at index 0, and the next
|
||||
@@ -30,6 +34,8 @@ public class MessageInputStream extends InputStream {
|
||||
private int _readyDataBlockIndex;
|
||||
/** highest message ID used in the readyDataBlocks */
|
||||
private long _highestReadyBlockId;
|
||||
/** highest overall message ID */
|
||||
private long _highestBlockId;
|
||||
/**
|
||||
* Message ID (Long) to ByteArray for blocks received
|
||||
* out of order when there are lower IDs not yet
|
||||
@@ -44,13 +50,17 @@ public class MessageInputStream extends InputStream {
|
||||
/** if we don't want any more data, ignore the data */
|
||||
private boolean _locallyClosed;
|
||||
private int _readTimeout;
|
||||
private IOException _streamError;
|
||||
|
||||
private Object _dataLock;
|
||||
|
||||
public MessageInputStream() {
|
||||
public MessageInputStream(I2PAppContext ctx) {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(MessageInputStream.class);
|
||||
_readyDataBlocks = new ArrayList(4);
|
||||
_readyDataBlockIndex = 0;
|
||||
_highestReadyBlockId = -1;
|
||||
_highestBlockId = -1;
|
||||
_readTimeout = -1;
|
||||
_notYetReadyBlocks = new HashMap(4);
|
||||
_dataLock = new Object();
|
||||
@@ -65,6 +75,42 @@ public class MessageInputStream extends InputStream {
|
||||
}
|
||||
}
|
||||
|
||||
public long getHighestBlockId() {
|
||||
synchronized (_dataLock) {
|
||||
return _highestBlockId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the message IDs that are holes in our sequence - ones
|
||||
* past the highest ready ID and below the highest received message
|
||||
* ID. This may return null if there are no such IDs.
|
||||
*
|
||||
*/
|
||||
public long[] getNacks() {
|
||||
List ids = null;
|
||||
synchronized (_dataLock) {
|
||||
for (long i = _highestReadyBlockId + 1; i < _highestBlockId; i++) {
|
||||
Long l = new Long(i);
|
||||
if (_notYetReadyBlocks.containsKey(l)) {
|
||||
// ACK
|
||||
} else {
|
||||
if (ids != null)
|
||||
ids = new ArrayList(4);
|
||||
ids.add(l);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ids != null) {
|
||||
long rv[] = new long[ids.size()];
|
||||
for (int i = 0; i < rv.length; i++)
|
||||
rv[i] = ((Long)ids.get(i)).longValue();
|
||||
return rv;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ascending list of block IDs greater than the highest
|
||||
* ready block ID, or null if there aren't any.
|
||||
@@ -101,56 +147,110 @@ public class MessageInputStream extends InputStream {
|
||||
public int getReadTimeout() { return _readTimeout; }
|
||||
public void setReadTimeout(int timeout) { _readTimeout = timeout; }
|
||||
|
||||
public void closeReceived() {
|
||||
synchronized (_dataLock) {
|
||||
_closeReceived = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A new message has arrived - toss it on the appropriate queue (moving
|
||||
* previously pending messages to the ready queue if it fills the gap, etc)
|
||||
* previously pending messages to the ready queue if it fills the gap, etc).
|
||||
*
|
||||
* @return true if this is a new packet, false if it is a dup
|
||||
*/
|
||||
public void messageReceived(long messageId, byte payload[]) {
|
||||
public boolean messageReceived(long messageId, byte payload[]) {
|
||||
synchronized (_dataLock) {
|
||||
if (messageId <= _highestReadyBlockId) return; // already received
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("received " + messageId + " with " + payload.length);
|
||||
if (messageId <= _highestReadyBlockId) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("ignoring dup message " + messageId);
|
||||
return false; // already received
|
||||
}
|
||||
if (messageId > _highestBlockId)
|
||||
_highestBlockId = messageId;
|
||||
|
||||
if (_highestReadyBlockId + 1 == messageId) {
|
||||
if (!_locallyClosed)
|
||||
if (!_locallyClosed && payload.length > 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("accepting bytes as ready: " + payload.length);
|
||||
_readyDataBlocks.add(new ByteArray(payload));
|
||||
}
|
||||
_highestReadyBlockId = messageId;
|
||||
// now pull in any previously pending blocks
|
||||
while (_notYetReadyBlocks.containsKey(new Long(_highestReadyBlockId + 1))) {
|
||||
_readyDataBlocks.add(_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1)));
|
||||
ByteArray ba = (ByteArray)_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1));
|
||||
if ( (ba != null) && (ba.getData() != null) && (ba.getData().length > 0) ) {
|
||||
_readyDataBlocks.add(ba);
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("making ready the block " + _highestReadyBlockId);
|
||||
_highestReadyBlockId++;
|
||||
}
|
||||
_dataLock.notifyAll();
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("message is out of order: " + messageId);
|
||||
if (_locallyClosed) // dont need the payload, just the msgId in order
|
||||
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(null));
|
||||
else
|
||||
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
if (_locallyClosed) throw new IOException("Already locally closed");
|
||||
throwAnyError();
|
||||
long expiration = -1;
|
||||
if (_readTimeout > 0)
|
||||
expiration = _readTimeout + System.currentTimeMillis();
|
||||
synchronized (_dataLock) {
|
||||
if (_readyDataBlocks.size() <= 0) {
|
||||
while (_readyDataBlocks.size() <= 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() with readyBlocks.size = " + _readyDataBlocks.size() + " on " + toString());
|
||||
|
||||
if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() got EOF: " + toString());
|
||||
return -1;
|
||||
} else {
|
||||
if (_readTimeout < 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() with no timeout: " + toString());
|
||||
try { _dataLock.wait(); } catch (InterruptedException ie) { }
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() with no timeout complete: " + toString());
|
||||
throwAnyError();
|
||||
} else if (_readTimeout > 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() with timeout: " + _readTimeout + ": " + toString());
|
||||
try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { }
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() with timeout complete: " + _readTimeout + ": " + toString());
|
||||
throwAnyError();
|
||||
} else { // readTimeout == 0
|
||||
// noop, don't block
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() with nonblocking setup: " + toString());
|
||||
}
|
||||
if (_readyDataBlocks.size() <= 0) {
|
||||
throw new InterruptedIOException("Timeout reading");
|
||||
if ( (_readTimeout > 0) && (expiration > System.currentTimeMillis()) )
|
||||
throw new InterruptedIOException("Timeout reading (timeout=" + _readTimeout + ")");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("read() readyBlocks = " + _readyDataBlocks.size() + ": " + toString());
|
||||
|
||||
// either was already ready, or we wait()ed and it arrived
|
||||
ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
|
||||
byte rv = cur.getData()[_readyDataBlockIndex++];
|
||||
byte rv = cur.getData()[_readyDataBlockIndex];
|
||||
_readyDataBlockIndex++;
|
||||
if (cur.getData().length <= _readyDataBlockIndex) {
|
||||
_readyDataBlockIndex = 0;
|
||||
_readyDataBlocks.remove(0);
|
||||
@@ -161,6 +261,7 @@ public class MessageInputStream extends InputStream {
|
||||
|
||||
public int available() throws IOException {
|
||||
if (_locallyClosed) throw new IOException("Already closed, you wanker");
|
||||
throwAnyError();
|
||||
synchronized (_dataLock) {
|
||||
if (_readyDataBlocks.size() <= 0)
|
||||
return 0;
|
||||
@@ -213,4 +314,23 @@ public class MessageInputStream extends InputStream {
|
||||
_locallyClosed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream b0rked, die with the given error
|
||||
*
|
||||
*/
|
||||
void streamErrorOccurred(IOException ioe) {
|
||||
_streamError = ioe;
|
||||
synchronized (_dataLock) {
|
||||
_dataLock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
private void throwAnyError() throws IOException {
|
||||
if (_streamError != null) {
|
||||
IOException ioe = _streamError;
|
||||
_streamError = null;
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user