1: doctype "../grammars/AsyncEJ.g.etl";
     2: /// This sample is based on async-objects framework
     3: 
     4: /// sample asynchronous interface
     5: /// The interface demonstrate simple input stream
     6: interface ByteInput {
     7: 	on int read(DataBuffer buffer);
     8: 	on void close();
     9: };
    10: 
    11: /// sample asynchronous class
    12: /// The class demonstrate simple buffered input
    13: class BufferedByteInputImpl {
    14: 	supports ByteInput;
    15: 	var private DataBuffer data;
    16: 	var private final ByteInput proxiedStream;
    17: 	var private final RequestQueue requests = new RequestQueue();
    18: 	var private Throwable problem;
    19: 	var private boolean isClosed = false;
    20: 	var private boolean eofReached = false;
    21: 
    22: 	maker public ABufferedInputStream(AInputStream proxiedStream, int size) {
    23: 		this.data = new DataBuffer(size);
    24: 		this.proxiedStream = proxiedStream;
    25: 	};
    26: 
    27: 	/// read some bytes in buffer
    28: 	/// @param buffer a buffer
    29: 	/// @returns Promise for buffer that is passed to stream
    30: 	on public int read(final DataBuffer buffer) {
    31: 		if(requests.tryStartRequest()) {
    32: 			return when(var int rc = performRead(buffer)) {
    33: 					return rc;
    34: 				} finally {
    35: 					requests.endRequest();
    36: 				};
    37: 		} else {
    38: 			return when(requests<-startRequest()) {
    39: 					return performRead(buffer);
    40: 				} finally {
    41: 					requests.endRequest();
    42: 				};
    43: 		};
    44: 	};
    45: 
    46: 	/// read some bytes in buffer
    47: 	/// @param buffer a input buffer
    48: 	/// @returns Promise that resolves to int that indicate amount of bytes read
    49: 	on private int performRead(final DataBuffer buffer) {
    50: 		if(buffer.availableForPut() < 1) {
    51: 		      throw new IllegalArgumentException("cannot put data into buffer with capacity "+buffer.availableForPut());
    52: 		};
    53: 		if(eofReached) {
    54: 			return -1;
    55: 		};
    56: 		if(problem != null) {
    57: 			throw problem;
    58: 		};
    59: 		if(isClosed) {
    60: 			throw new IllegalStateException("the stream is already closed");
    61: 		};
    62: 		if(data.isEmpty()) {
    63: 			return when(var int readBytes = proxiedStream<-read(data)) {
    64: 				if(readBytes == -1) {
    65: 					eofReached = true;
    66: 				};
    67: 				// something is read or eof reached
    68: 				return performRead(buffer);
    69: 			} catch (Throwable t) {
    70: 				problem = t;
    71: 				throw t;
    72: 			};
    73: 		} else {
    74: 			return buffer.transferFrom(data);
    75: 		};
    76: 	};
    77: 
    78: 
    79: 	/// close stream
    80: 	/// @returns a promise that resolves to null or breaks with exception.
    81: 	on public void close() {
    82: 		if(requests.tryStartRequest()) {
    83: 			return when(performClose()) {
    84: 					return;
    85: 				} finally {
    86: 					requests.endRequest();
    87: 				};
    88: 		} else {
    89: 			return when(requests<-startRequest()) {
    90: 					return performClose();
    91: 				} finally {
    92: 					requests.endRequest();
    93: 				};
    94: 		};
    95: 	};
    96: 
    97: 	/// close stream
    98: 	/// @returns a promise that resolves to null or breaks with exception.
    99: 	on private void performClose() {
   100: 		if(isClosed) {
   101: 			throw new IllegalStateException("the stream is already closed");
   102: 		} else {
   103: 			isClosed = true;
   104: 			return proxiedStream<-close();
   105: 		};
   106: 	};
   107: };