[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: A CSP model for Java threads



Dear Peter,

Here is an FDR analysis of your model for java threads which shows that
the JCSP channel is equivalent to something very simple in CSP. I found
a couple of very minor bugs in your initial CSP specification, but
otherwise I stayed true to your definition.

I'm copying this to the mailing lists for wider discussion.

Jeremy

--
-- FDR analysis of Peter Welch's CSP model of Java threads AND
-- the JCSP implementation of a channel.
--
-- Jeremy Martin  30th July 1999 
--

--
-- We shall model a system consisting of a set of Java objects and threads.
-- 

Objects = {0}
Threads = {0,1}

--
-- We define a collection of channels to model the synchronisation
-- events.
--

channel claim, release, waita, waitb, notify, notifyall: Objects.Threads

--
-- System processes: LOCK(o) controls locking of object o's monitor and
-- SWAIT controls its auxilliary pool of waiting threads ts. 
--

LOCK(o) = claim.o?t -> LOCKED(o,t)

LOCKED(o,t) = release.o.t -> LOCK(o)
              []
              notify.o.t -> LOCKED(o,t)
              []
              notifyall.o.t -> LOCKED(o,t)

alphaLOCK(o) = { claim.o.t,release.o.t,notify.o.t,notifyall.o.t 
                 | t <- Threads }

--
-- A notify event results in one thread being non-deterministically
-- selected from the waiting pool and reactivated; a notifyall
-- event results in all the threads being activated in some
-- non-deterministic order (changed from PHW's original which was 
-- deterministic).
--

SWAIT(o,ts) =  let
                 n = card(ts)         
               within
                 (
                   waita.o?t -> SWAIT(o,union(ts, {t}))
                 ) [] (
                   notify.o?t -> 
                   if (n > 0) then
                     |~| s:ts @ waitb.o!s -> SWAIT(o,diff(ts,{s}))
                   else                      
                     SWAIT(o,{})
                 ) [] (
                   notifyall.o?t -> RELEASE(o,ts)
                 )

RELEASE(o,ts) = let 
                  n = card(ts)
                within
                  if (n > 0) then
                    |~| t:ts @ waitb.o!t -> RELEASE(o,diff(ts,{t}))
                  else
                    SWAIT(o,{})       

alphaSWAIT(o) = { 
                  waita.o.t,waitb.o.t,notify.o.t,notifyall.o.t 
                  | t <- Threads
                }

--
-- The monitor is defined as the parallel composition of the two system 
-- processes. Initially the auxiliary wait queue is empty.
--

MONITOR(o) = SWAIT(o,{}) [ alphaSWAIT(o) || alphaLOCK(o) ] LOCK(o)

alphaMONITOR(o) = union(alphaSWAIT(o),alphaLOCK(o))

--
-- Now we define the standard programmer's interface to monitors. 
-- Note that the WAIT procedure has been modified from Peter's original
-- CSP definition. An object needs to join the wait queue, by issuing
-- a waita event,  prior to releasing the monitor otherwise it might miss 
-- being notified. This was revealed by FDR in an early run.
--
-- For the moment we'll prohibit multiple locks by a particular thread
-- on a particular object. This could easily be handled by using processes
-- to represent the relevant lock counts (as Peter has shown).
--

STARTSYNC(o,me) = claim.o!me -> SKIP

ENDSYNC(o,me) = release.o!me -> SKIP

WAIT(o,me) = waita.o.me -> release.o!me -> waitb.o!me -> claim.o!me -> SKIP

NOTIFY(o,me) = notify.o.me -> SKIP

--
-- JCSP Implementation of channel:
--
-- This is translated from "P.H.Welch, Java Threads in the light of
-- occam/CSP, Proceedings of WoTUG21 1998", page 274.
--
-- The JCSP channel object has three variables (channel_empty, channel_hold
-- and local), which we shall model as processes always ready both to 
-- have their value reset or to report it to any willing thread. We
-- assume, for simplicity, that each channel has the same `boolean' type.
--

datatype Variables = channel_empty | channel_hold | local
datatype Data = TRUE | FALSE | OTHER
channel getvar, setvar: Objects.Variables.Threads.Data

--
-- Variables are initialised as TRUE.
--

VARIABLE(o,v) = VARIABLE2(o,v,TRUE)

VARIABLE2(o,v,d) = (
                     [] t:Threads @ getvar.o.v.t!d -> VARIABLE2(o,v,d)
                   ) [] (
                     [] t:Threads @ setvar.o.v.t?x -> VARIABLE2(o,v,x)
                   )

alphaVARIABLE(o,v) = {
                       getvar.o.v.t.d, setvar.o.v.t.d 
                       | t <- Threads, d <- Data
                     }

VARIABLES(o) = 
    (
        VARIABLE(o,channel_empty) 
      [alphaVARIABLE(o,channel_empty) || alphaVARIABLE(o,channel_hold)] 
        VARIABLE(o,channel_hold)
    ) 
  [union(alphaVARIABLE(o,channel_empty),alphaVARIABLE(o,channel_hold)) 
  || alphaVARIABLE(o,local)] 
    VARIABLE(o,local)

alphaVARIABLES(o) = union(union(alphaVARIABLE(o,channel_empty),
                    alphaVARIABLE(o,channel_hold)),alphaVARIABLE(o,local)) 

--
-- One purpose of JCSP is to seal off the thread/monitor synchronisation
-- calls from the programmer. Instead a read/write interface is provided
-- by two simple methods.
--
-- We shall model this interface with the following events:
--   write.o.t.d   - thread t activates java method write(d) of object o,
--                   message d is supplied for transmission.
--   ack.o.t       - call to write(d) terminates,
--   ready.o.t'    - Thread t' activates method read() of object o.
--   read.o.t'.d   - call to read() terminates, message d is returned
--
-- The JCSP channel should behave like a synchronised channel. Each
-- successful communcation requires that at some point both threads
-- are simultaneously involved.
-- 

channel read, write: Objects.Threads.Data
channel ready, ack: Objects.Threads

--
-- We model the JCSP read method as a process which repeatly waits to be
-- activated by a ready event and then executes the monitor sychronisation 
-- code to receive a message.
-- 

READ(o,t) = ready.o.t -> 
            claim.o.t -> 
            getvar.o.channel_empty.t?c ->
            (
              if (c == TRUE) then 
                setvar.o.channel_empty.t!FALSE -> 
                WAIT(o,t);
                getvar.o.channel_hold.t?mess -> 
                setvar.o.local.t!mess -> 
                NOTIFY(o,t)
              else
                setvar.o.channel_empty.t!TRUE -> 
                getvar.o.channel_hold.t?mess -> 
                setvar.o.local.t!mess -> 
                NOTIFY(o,t)
            ) ;
            getvar.o.local.t?mess -> 
            release.o.t ->    
            read.o.t!mess -> 
            READ(o,t)

alphaREAD(o,t) = {
                   claim.o.t, getvar.o.v.t.d, notify.o.t, notifyall.o.t, 
                   setvar.o.v.t.d, read.o.t.d, release.o.t, waita.o.t, 
                   waitb.o.t, ready.o.t  
                   | v <- Variables, d <- Data
                 }

--
-- The JCSP write method is similarly modelled as a repeating process,
-- activated by the write event.
--

WRITE(o,t) = write.o.t?mess -> 
             claim.o.t ->  
             setvar.o.channel_hold.t!mess ->
             getvar.o.channel_empty.t?c ->
             (if (c == TRUE) then
                setvar.o.channel_empty.t!FALSE -> 
                WAIT(o,t)
              else
                setvar.o.channel_empty.t!TRUE -> 
                NOTIFY(o,t); 
                WAIT(o,t));
             release.o.t -> 
             ack.o.t -> 
             WRITE(o,t)

alphaWRITE(o,t) = { 
                    claim.o.t, getvar.o.v.t.d, notify.o.t, notifyall.o.t,
                    setvar.o.v.t.d, write.o.t.d, release.o.t, waita.o.t, 
                    waitb.o.t, ack.o.t 
                    | v <- Variables, d <- Data
                  }


--
-- The JCSP channel is then modelled as the parallel composition of
-- processes which model its monitor, three variables, and  read and write
-- methods.
--

JCSPCHANNEL(o,t1,t2) = 
   (
     READ(o,t1) 
       [alphaREAD(o,t1) || alphaWRITE(o,t2)] 
     WRITE(o,t2)
   )
 [union(alphaREAD(o,t1),alphaWRITE(o,t2)) || 
  union(alphaMONITOR(o),alphaVARIABLES(o))] 
   (
     MONITOR(o) 
       [alphaMONITOR(o) || alphaVARIABLES(o)]
     VARIABLES(o)
   ) 

alphaJCSPCHANNEL(o,t1,t2) = 
  union(union(alphaMONITOR(o),alphaVARIABLES(o)),
        union(alphaREAD(o,t1),alphaWRITE(o,t2)))

--
-- Now we shall define a simplified model of how the JCSP channel
-- should work, and then, hopefully, we shall use FDR to show that this is
-- equivalent to the JCSP implementation. 
--
-- The simple channel consists of two parallel processes, LEFT and RIGHT,
-- to handle input and output respectively. The processes are joined
-- by a hidden channel transmit.
--  

channel transmit:Objects.Data

LEFT(o,t) = write.o.t?mess -> transmit.o!mess -> ack.o.t -> LEFT(o,t)

RIGHT(o,t') = ready.o.t' -> transmit.o?mess -> read.o.t'!mess -> RIGHT(o,t')

alphaLEFT(o,t) = {write.o.t.m, transmit.o.m, ack.o.t | m <- Data}

alphaRIGHT(o,t') = {ready.o.t', transmit.o.m, read.o.t'.m | m <- Data}

CHANNEL(o,t,t') = 
  (
      LEFT(o,t') 
    [alphaLEFT(o,t') || alphaRIGHT(o,t)] 
      RIGHT(o,t)
  ) \ {transmit.o.m | m <- Data}

alphaCHANNEL(o,t,t') = 
  {write.o.t'.m, ack.o.t', ready.o.t, read.o.t.m | m <- Data} 

--
-- In order to compare this specification with the JCSP implementation
-- we need to conceal all the additional events in the alphabet
-- of JCSPCHANNEL
--

Private = diff(alphaJCSPCHANNEL(0,0,1),alphaCHANNEL(0,0,1))

--
-- Assert that JCSPCHANNEL(o,t1,t2) \ Private is equivalent to 
-- CHANNEL(o,t1,t2) in the Failures/Divergences model (by asserting
-- refinement in both directions).
--

assert CHANNEL(0,0,1) [FD= JCSPCHANNEL(0,0,1) \ Private

assert JCSPCHANNEL(0,0,1) \ Private [FD= CHANNEL(0,0,1)

--
-- That works fine with the current system where only two threads
-- are in existence, but when we increase the number of threads in the 
-- system beyond two, perhaps by redefining Threads = {0,1,2}, we find that 
-- the above pair of assertions no longer holds. FDR reveals that this is 
-- because the new threads may `fiddle' with the state of the channel 
-- implementation.
--
-- So how do we stop other threads from interfering with the channel object?
--
-- In CSP we can add a parallel process to the channel implementation which 
-- blocks access to any of the the relevant events! (Unfortunately this is
-- not supported by the actual Java implementation, and so must be
-- regarded as a usage rule for JCSP.)
--

PROTECTION(o,t,t') = STOP

alphaPROTECTION(o,t,t') = { claim.o.t'', setvar.o.v.t''.d, 
                            getvar.o.v.t''.d, waita.o.t''
                            | t'' <- diff(Threads,{t,t'}),
                              v <- Variables,
                              d <- Data
                          }

SAFEJCSPCHANNEL(o,t,t') = 
    JCSPCHANNEL(o,t,t')
  [ alphaJCSPCHANNEL(o,t,t') || alphaPROTECTION(o,t,t') ]
    PROTECTION(o,t,t')

assert CHANNEL(0,0,1) [FD= SAFEJCSPCHANNEL(0,0,1) \ Private

assert SAFEJCSPCHANNEL(0,0,1) \ Private [FD= CHANNEL(0,0,1)

--
-- This pair of assertions is indeed found to hold when the number of threads 
-- is increased beyond 2.
--

--
-- Summary:
--
-- We have shown how to reduce the CSP model of the JCSP channel 
-- implementation to a far simpler equivalent form. Further work
-- would be useful to analyse the JCSP implementation of alternation.
--

 -------------------------------------------------------------------------
                            Dr J. M. R. Martin                
 Oxford Supercomputing Centre                          Tel: (01865) 273849
 Wolfson Building                                      Fax: (01865) 273839
 Parks Road                           Email: jeremy.martin@xxxxxxxxxxxxxxx
 Oxford OX1 3QD                         WWW: http://users.ox.ac.uk/~jeremy