1: doctype "../grammars/AsyncEJ.g.etl";
2: /// This sample is based on async-objects framework
3:
4: /// sample asynchronous interface
5: /// The interface demonstrate simple input stream
6: interface ByteInput {
7: on int read(DataBuffer buffer);
8: on void close();
9: };
10:
11: /// sample asynchronous class
12: /// The class demonstrate simple buffered input
13: class BufferedByteInputImpl {
14: supports ByteInput;
15: var private DataBuffer data;
16: var private final ByteInput proxiedStream;
17: var private final RequestQueue requests = new RequestQueue();
18: var private Throwable problem;
19: var private boolean isClosed = false;
20: var private boolean eofReached = false;
21:
22: maker public ABufferedInputStream(AInputStream proxiedStream, int size) {
23: this.data = new DataBuffer(size);
24: this.proxiedStream = proxiedStream;
25: };
26:
27: /// read some bytes in buffer
28: /// @param buffer a buffer
29: /// @returns Promise for buffer that is passed to stream
30: on public int read(final DataBuffer buffer) {
31: if(requests.tryStartRequest()) {
32: return when(var int rc = performRead(buffer)) {
33: return rc;
34: } finally {
35: requests.endRequest();
36: };
37: } else {
38: return when(requests<-startRequest()) {
39: return performRead(buffer);
40: } finally {
41: requests.endRequest();
42: };
43: };
44: };
45:
46: /// read some bytes in buffer
47: /// @param buffer a input buffer
48: /// @returns Promise that resolves to int that indicate amount of bytes read
49: on private int performRead(final DataBuffer buffer) {
50: if(buffer.availableForPut() < 1) {
51: throw new IllegalArgumentException("cannot put data into buffer with capacity "+buffer.availableForPut());
52: };
53: if(eofReached) {
54: return -1;
55: };
56: if(problem != null) {
57: throw problem;
58: };
59: if(isClosed) {
60: throw new IllegalStateException("the stream is already closed");
61: };
62: if(data.isEmpty()) {
63: return when(var int readBytes = proxiedStream<-read(data)) {
64: if(readBytes == -1) {
65: eofReached = true;
66: };
67:
68: return performRead(buffer);
69: } catch (Throwable t) {
70: problem = t;
71: throw t;
72: };
73: } else {
74: return buffer.transferFrom(data);
75: };
76: };
77:
78:
79: /// close stream
80: /// @returns a promise that resolves to null or breaks with exception.
81: on public void close() {
82: if(requests.tryStartRequest()) {
83: return when(performClose()) {
84: return;
85: } finally {
86: requests.endRequest();
87: };
88: } else {
89: return when(requests<-startRequest()) {
90: return performClose();
91: } finally {
92: requests.endRequest();
93: };
94: };
95: };
96:
97: /// close stream
98: /// @returns a promise that resolves to null or breaks with exception.
99: on private void performClose() {
100: if(isClosed) {
101: throw new IllegalStateException("the stream is already closed");
102: } else {
103: isClosed = true;
104: return proxiedStream<-close();
105: };
106: };
107: };