source: trunk/minix/lib/ack/libm2/CSP.mod@ 11

Last change on this file since 11 was 9, checked in by Mattia Monga, 14 years ago

Minix 3.1.2a

File size: 6.8 KB
Line 
1(*$R-*)
2IMPLEMENTATION MODULE CSP;
3(*
4 Module: Communicating Sequential Processes
5 From: "A Modula-2 Implementation of CSP",
6 M. Collado, R. Morales, J.J. Moreno,
7 SIGPlan Notices, Volume 22, Number 6, June 1987.
8 Some modifications by Ceriel J.H. Jacobs
9 Version: $Header: /cvsup/minix/src/lib/ack/libm2/CSP.mod,v 1.1 2005/10/10 15:27:46 beng Exp $
10
11 See this article for an explanation of the use of this module.
12*)
13
14 FROM random IMPORT Uniform;
15 FROM SYSTEM IMPORT BYTE, ADDRESS, NEWPROCESS, TRANSFER;
16 FROM Storage IMPORT Allocate, Deallocate;
17 FROM Traps IMPORT Message;
18
19 CONST WorkSpaceSize = 2000;
20
21 TYPE ByteAddress = POINTER TO BYTE;
22 Channel = POINTER TO ChannelDescriptor;
23 ProcessType = POINTER TO ProcessDescriptor;
24 ProcessDescriptor = RECORD
25 next: ProcessType;
26 father: ProcessType;
27 cor: ADDRESS;
28 wsp: ADDRESS;
29 guardindex: INTEGER;
30 guardno: CARDINAL;
31 guardcount: CARDINAL;
32 opened: Channel;
33 sons: CARDINAL;
34 msgadr: ADDRESS;
35 msglen: CARDINAL;
36 END;
37
38 Queue = RECORD
39 head, tail: ProcessType;
40 END;
41
42 ChannelDescriptor = RECORD
43 senders: Queue;
44 owner: ProcessType;
45 guardindex: INTEGER;
46 next: Channel;
47 END;
48
49 VAR cp: ProcessType;
50 free, ready: Queue;
51
52(* ------------ Private modules and procedures ------------- *)
53
54 MODULE ProcessQueue;
55
56 IMPORT ProcessType, Queue;
57 EXPORT Push, Pop, InitQueue, IsEmpty;
58
59 PROCEDURE InitQueue(VAR q: Queue);
60 BEGIN
61 WITH q DO
62 head := NIL;
63 tail := NIL
64 END
65 END InitQueue;
66
67 PROCEDURE Push(p: ProcessType; VAR q: Queue);
68 BEGIN
69 p^.next := NIL;
70 WITH q DO
71 IF head = NIL THEN
72 tail := p
73 ELSE
74 head^.next := p
75 END;
76 head := p
77 END
78 END Push;
79
80 PROCEDURE Pop(VAR q: Queue; VAR p: ProcessType);
81 BEGIN
82 WITH q DO
83 p := tail;
84 IF p # NIL THEN
85 tail := tail^.next;
86 IF head = p THEN
87 head := NIL
88 END
89 END
90 END
91 END Pop;
92
93 PROCEDURE IsEmpty(q: Queue): BOOLEAN;
94 BEGIN
95 RETURN q.head = NIL
96 END IsEmpty;
97
98 END ProcessQueue;
99
100
101 PROCEDURE DoTransfer;
102 VAR aux: ProcessType;
103 BEGIN
104 aux := cp;
105 Pop(ready, cp);
106 IF cp = NIL THEN
107 HALT
108 ELSE
109 TRANSFER(aux^.cor, cp^.cor)
110 END
111 END DoTransfer;
112
113 PROCEDURE OpenChannel(ch: Channel; n: INTEGER);
114 BEGIN
115 WITH ch^ DO
116 IF guardindex = 0 THEN
117 guardindex := n;
118 next := cp^.opened;
119 cp^.opened := ch
120 END
121 END
122 END OpenChannel;
123
124 PROCEDURE CloseChannels(p: ProcessType);
125 BEGIN
126 WITH p^ DO
127 WHILE opened # NIL DO
128 opened^.guardindex := 0;
129 opened := opened^.next
130 END
131 END
132 END CloseChannels;
133
134 PROCEDURE ThereAreOpenChannels(): BOOLEAN;
135 BEGIN
136 RETURN cp^.opened # NIL;
137 END ThereAreOpenChannels;
138
139 PROCEDURE Sending(ch: Channel): BOOLEAN;
140 BEGIN
141 RETURN NOT IsEmpty(ch^.senders)
142 END Sending;
143
144(* -------------- Public Procedures ----------------- *)
145
146 PROCEDURE COBEGIN;
147 (* Beginning of a COBEGIN .. COEND structure *)
148 BEGIN
149 END COBEGIN;
150
151 PROCEDURE COEND;
152 (* End of a COBEGIN .. COEND structure *)
153 (* VAR aux: ProcessType; *)
154 BEGIN
155 IF cp^.sons > 0 THEN
156 DoTransfer
157 END
158 END COEND;
159
160 PROCEDURE StartProcess(P: PROC);
161 (* Start an anonimous process that executes the procedure P *)
162 VAR newprocess: ProcessType;
163 BEGIN
164 Pop(free, newprocess);
165 IF newprocess = NIL THEN
166 Allocate(newprocess,SIZE(ProcessDescriptor));
167 Allocate(newprocess^.wsp, WorkSpaceSize)
168 END;
169 WITH newprocess^ DO
170 father := cp;
171 sons := 0;
172 msglen := 0;
173 NEWPROCESS(P, wsp, WorkSpaceSize, cor)
174 END;
175 cp^.sons := cp^.sons + 1;
176 Push(newprocess, ready)
177 END StartProcess;
178
179 PROCEDURE StopProcess;
180 (* Terminate a Process (itself) *)
181 VAR aux: ProcessType;
182 BEGIN
183 aux := cp^.father;
184 aux^.sons := aux^.sons - 1;
185 IF aux^.sons = 0 THEN
186 Push(aux, ready)
187 END;
188 aux := cp;
189 Push(aux, free);
190 Pop(ready, cp);
191 IF cp = NIL THEN
192 HALT
193 ELSE
194 TRANSFER(aux^.cor, cp^.cor)
195 END
196 END StopProcess;
197
198 PROCEDURE InitChannel(VAR ch: Channel);
199 (* Initialize the channel ch *)
200 BEGIN
201 Allocate(ch, SIZE(ChannelDescriptor));
202 WITH ch^ DO
203 InitQueue(senders);
204 owner := NIL;
205 next := NIL;
206 guardindex := 0
207 END
208 END InitChannel;
209
210 PROCEDURE GetChannel(ch: Channel);
211 (* Assign the channel ch to the process that gets it *)
212 BEGIN
213 WITH ch^ DO
214 IF owner # NIL THEN
215 Message("Channel already has an owner");
216 HALT
217 END;
218 owner := cp
219 END
220 END GetChannel;
221
222 PROCEDURE Send(data: ARRAY OF BYTE; VAR ch: Channel);
223 (* Send a message with the data to the cvhannel ch *)
224 VAR m: ByteAddress;
225 (* aux: ProcessType; *)
226 i: CARDINAL;
227 BEGIN
228 WITH ch^ DO
229 Push(cp, senders);
230 Allocate(cp^.msgadr, SIZE(data));
231 m := cp^.msgadr;
232 cp^.msglen := HIGH(data);
233 FOR i := 0 TO HIGH(data) DO
234 m^ := data[i];
235 m := ADDRESS(m) + 1
236 END;
237 IF guardindex # 0 THEN
238 owner^.guardindex := guardindex;
239 CloseChannels(owner);
240 Push(owner, ready)
241 END
242 END;
243 DoTransfer
244 END Send;
245
246 PROCEDURE Receive(VAR ch: Channel; VAR dest: ARRAY OF BYTE);
247 (* Receive a message from the channel ch into the dest variable *)
248 VAR aux: ProcessType;
249 m: ByteAddress;
250 i: CARDINAL;
251 BEGIN
252 WITH ch^ DO
253 IF cp # owner THEN
254 Message("Only owner of channel can receive from it");
255 HALT
256 END;
257 IF Sending(ch) THEN
258 Pop(senders, aux);
259 m := aux^.msgadr;
260 FOR i := 0 TO aux^.msglen DO
261 dest[i] := m^;
262 m := ADDRESS(m) + 1
263 END;
264 Push(aux, ready);
265 Push(cp, ready);
266 CloseChannels(cp)
267 ELSE
268 OpenChannel(ch, -1);
269 DoTransfer;
270 Pop(senders, aux);
271 m := aux^.msgadr;
272 FOR i := 0 TO aux^.msglen DO
273 dest[i] := m^;
274 m := ADDRESS(m) + 1
275 END;
276 Push(cp, ready);
277 Push(aux, ready)
278 END;
279 Deallocate(aux^.msgadr, aux^.msglen+1);
280 DoTransfer
281 END
282 END Receive;
283
284 PROCEDURE SELECT(n: CARDINAL);
285 (* Beginning of a SELECT structure with n guards *)
286 BEGIN
287 cp^.guardindex := Uniform(1,n);
288 cp^.guardno := n;
289 cp^.guardcount := n
290 END SELECT;
291
292 PROCEDURE NEXTGUARD(): CARDINAL;
293 (* Returns an index to the next guard to be evaluated in a SELECT *)
294 BEGIN
295 RETURN cp^.guardindex
296 END NEXTGUARD;
297
298 PROCEDURE GUARD(cond: BOOLEAN; ch: Channel;
299 VAR dest: ARRAY OF BYTE): BOOLEAN;
300 (* Evaluates a guard, including reception management *)
301 (* VAR aux: ProcessType; *)
302 BEGIN
303 IF NOT cond THEN
304 RETURN FALSE
305 ELSIF ch = NIL THEN
306 CloseChannels(cp);
307 cp^.guardindex := 0;
308 RETURN TRUE
309 ELSIF Sending(ch) THEN
310 Receive(ch, dest);
311 cp^.guardindex := 0;
312 RETURN TRUE
313 ELSE
314 OpenChannel(ch, cp^.guardindex);
315 RETURN FALSE
316 END
317 END GUARD;
318
319 PROCEDURE ENDSELECT(): BOOLEAN;
320 (* End of a SELECT structure *)
321 BEGIN
322 WITH cp^ DO
323 IF guardindex <= 0 THEN
324 RETURN TRUE
325 END;
326 guardcount := guardcount - 1;
327 IF guardcount # 0 THEN
328 guardindex := (guardindex MOD INTEGER(guardno)) + 1
329 ELSIF ThereAreOpenChannels() THEN
330 DoTransfer
331 ELSE
332 guardindex := 0
333 END
334 END;
335 RETURN FALSE
336 END ENDSELECT;
337
338BEGIN
339 InitQueue(free);
340 InitQueue(ready);
341 Allocate(cp,SIZE(ProcessDescriptor));
342 WITH cp^ DO
343 sons := 0;
344 father := NIL
345 END
346END CSP.
347
Note: See TracBrowser for help on using the repository browser.