[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Java -- learning from occam3



//{{{}}}

I think what follows is a completely secure (:-) solution to the infinite
overtaking problem for multiple readers and writers of a shared buffer (or
channel).

//{{{  thread management overheads

ComsTime gives a rough comparison between the overheads of using the various
communications glue.  The figures I get (on a SPARC-5) are:

     Version               Note                   Time per ComsTime cycle (ms)
     -------               ----                   ----------------------------

  BUFFER_OF_INT(n)  (secure for multiple                      8.8
                     readers and writers
                     -- no busy loops
                     -- explained below)

  BUFFER_OF_INT(n)  (earlier version for a                    5.5
                     single reader and writer
                     -- no busy loops)

  CHAN_OF_INT       (proper occam channel for                 5.5
                     a single reader and writer
                     -- no busy loops)

  Channel(n, FIFO)  (Gerald's for a single                    1.5
                     reader and writer -- busy
                     loops when blocked)

  Channel()         (Gerald's for a single                    0.6
                     reader and writer -- still
                     a blocking FIFO of size 1
                     -- busy loops when blocked)

You get what you pay for!  Gerald's busy-loops avoid the use of the wait()
and notify() methods, which Java provides for passive waiting within
synchronised methods.  So long as you know your block will be removed
within a smaller number of busy-loops than it costs to wait()/notify(),
use Gerald's Channel ... this is the case for ComsTime.

Just for interest, to demonstrate that passive blocking can be rather
efficient if implemented at the right level, here is ComsTime in occam
under KRoC running on exactly the same processor:

     Version               Note                   Time per ComsTime cycle (ms)
     -------               ----                   ----------------------------

  CHAN OF INT       (proper occam channel for                 0.005
                     a single reader and writer
                     -- no busy loops)

//}}}
//{{{  shared buffer with multiple readers and writers

//{{{  problem with the previous BUFFER_OF_INT

The earlier BUFFER_OF_INT can be used for multiple readers and writers
securely (in the sense that no inconsistent state can be reached).

However, there is a potential unfairness on individual readers and/or
writers.  This unfairness could lead to infinite starvation where a
reader (say) is continually beaten to a near-empty buffer by fellow
readers.  Same for a writer.

This problem also exists for the CubbyHole example give in Sun's Java
Tutorial ... but no hint of this problem is raised there ...

//}}}
//{{{  occam3 SHARED channels give the solution

To see how to solve this problem properly, all we need do is look at occam3.
It doesn't matter that the language is not implemented!  What matters is the
thinking that went into it ...

//{{{  MODULE TYPE BUFFER.OF.INT ()
MODULE TYPE BUFFER.OF.INT ()
  INTERFACE
    //{{{  service channels
    SHARED CALL read (INT n):
    SHARED CALL write (VAL INT n):
    //}}}
  TO
    //{{{  internal state
    [max]INT buffer:
    INITIAL INT size IS 0:
    INITIAL INT hi IS 0:
    INITIAL INT lo IS 0:
    //}}}
    //{{{  server thread
    SERVER
      //{{{  (size > 0) & ACCEPT read (INT n)
      (size > 0) & ACCEPT read (INT n)
        SEQ
          n := buffer[lo]
          lo, size := (lo + 1)\max, size - 1
      //}}}
        SKIP
      //{{{  (size < max) & ACCEPT write (VAL INT n)
      (size < max) & ACCEPT write (VAL INT n)
        SEQ
          buffer[hi] := n
          hi, size := (hi + 1)\max, size + 1
      //}}}
        SKIP
    //}}}
:
//}}}

The required insight is that multiple readers are queued up *separately* (on
the SHARED read channel) from multiple writers (who are queued up on the
SHARED write channel).  In my earlier version of BUFFER_OF_INT (and in Sun's
CubbyHole), multiple readers/writers queue up on the *same* monitor.  When a
reader (say) acquires this monitor but finds itself blocked, it executes
a wait ... when it subsequently gets notified (by a writer that's just put
something in), it gets back on the monitor queue ... behind readers who
came in later than itself and who may grab the data, empty the buffer and
leave the original reader no option but to wait again ...

There are some other lessons from the above occam3 code that I'll come back
to later ...

//}}}
//{{{  derived Java solution (a multiple-reader-writer-secure BUFFER_OF_INT)

What we need is to queue up readers and writers separately.  That means we
have to have three monitors (which have to be implemented by three objects):

  o read monitor : to resolve contention between multiple readers;
  o write monitor : to resolve contention between multiple writers;
  o buffer monitor : to resolve contention between a single reader and writer.

All readers queue up to multiplex through the read monitor.  They stay in
that queue, even when the reader that's acquired it can't progress because
the buffer is empty.

All writers queue up to multiplex through the write monitor.  They stay in
that queue, even when the writer that's acquired it can't progress because
the buffer is full.

//{{{  new bit of Java semantics

Java threads have no formal semantics to which we can appeal (unlike occam).
What is worse is that they don't seem to have an informal natural language
definition over which we can argue ... or, at least, I can't find one.

This leaves us doing experiments with the JDK to find out things ... and
hoping that future Java implementations will not invalidate our conclusions!

//{{{  question and answer

Question:

  A thread acquires monitor X and, then, acquires monitor Y.  It can do this
  be executing a synchronized X-method which executes a synchronized Y-method.
  The Y-method executes a wait().  Now, the books tell us that that wait()
  releases the Y monitor ... but does it also release the X monitor?

Answer (discovered by experiment ... try to find it in a book!):

  No.

//}}}

This is just the answer we want ... I hope it doesn't change!  The X monitor
will be our reader monitor (say) and the Y one will be the buffer itself.
When a reader gets the reader monitor and, then, the buffer monitor and,
then finds the buffer empty, it does a wait() ... which allows in a writer
on the buffer, but keeps its fellow readers at bay still queueing for the
reader monitor.  Excellent!

We can now implement the buffer.

//}}}
//{{{  BUFFER_OF_INT.java

//{{{  imports
import java.io.*;
import java.util.*;
import java.lang.*;
//}}}

//{{{  public class BUFFER_OF_INT {
public class BUFFER_OF_INT {

  //{{{  COMMENT specification
  //
  //BUFFER_OF_INT implements a blocking FIFO buffer of integers.  A fixed size
  //is defined upon initialisation.  There can be any number of concurrent
  //readers and writers.  Readers are blocked when the buffer is empty.  Writers
  //are blocked when the buffer is full.  A non-empty buffer will not refuse
  //a reader.  A non-full buffer will not refuse a writer.
  //
  //The buffer is also `fair' -- i.e. readers are dealt with in the order of
  //their arrival ... same for writers.  Contention between readers and writers
  //is dealt with in an arbitrary fashion ... but any unfairness is limited by
  //the size of the buffer (only a finite number of reads can take place without
  //a write and vice-versa).
  //
  //The buffer is `non-busy' -- i.e. there are no polling loops (e.g. by a reader
  //waiting for the buffer to become non-empty).  Blocking is entirely passive.
  //
  //}}}
  //{{{  COMMENT implementation
  //
  //Readers and writers are queued up separately (on independent read_ and
  //write_monitors).  Once a reader gets its read_monitor, it doesn't let go
  //until it has successfully read something from the buffer (waiting, if
  //necessary, until the buffer becomes non-empty).  Once a writer gets its
  //write_monitor, it doesn't let go until it has successfully put something
  //into the buffer (waiting, if necessary, until the buffer become non-full).
  //Other readers and writers stay in line during any waits.  Only a single
  //reader and a single writer (each holding their respective monitors) may
  //be in contention for modifying the buffer and this contention is resolved
  //through the synchronized methods sync_read/sync_write (which is arbitrated
  //by a third monitor belonging to the BUFFER_OF_INT object itself).
  //
  //It is important that the public read and write methods are not synchronized.
  //Otherwise, a reader waiting on an empty buffer would block all writers!
  //
  //Readers call the read method and, then, queue up for the read_monitor.  When
  //a reader acquires this, it queues up for the BUFFER_OF_INT monitor, where its
  //only competitor may be a single writer that has acquired its write_monitor.
  //When it acquires the BUFFER_OF_INT monitor, it may have to wait() because
  //the buffer turned out to be empty.  This wait() releases the BUFFER_OF_INT,
  //allowing a writer in, but does not release the read_monitor first acquired.
  //This forces the other readers to wait patiently in line and stops them
  //overtaking (perhaps infinitely often) the waiting reader.
  //
  //The writers' story is symmetric to the above.
  //
  //}}}

  //{{{  local state
  int[] buffer;
  int max;
  
  int size = 0;                    // INVARIANT: (0 <= size <= max)
  int hi = 0;                      // INVARIANT: (0 <= hi < max)
  int lo = 0;                      // INVARIANT: (0 <= lo < max)
  
  boolean waiting_reader = false;  // INVARIANT: waiting_reader ==> (size = 0)
  boolean waiting_writer = false;  // INVARIANT: waiting_writer ==> (size = max)
  
  READ_MONITOR read_monitor =      // all readers multiplex through this
    new READ_MONITOR (this);
  
  WRITE_MONITOR write_monitor =    // all writers multiplex through this
    new WRITE_MONITOR (this);
  //}}}
  //{{{  constructor
  BUFFER_OF_INT (int max) {
    this.max = max;
    buffer = new int[max];
  }
  //}}}

  //{{{  public int read () {
  public int read () {
    return read_monitor.read ();
  }
  //}}}
  //{{{  public void write (int n) {
  public void write (int n) {
    write_monitor.write (n);
  }
  //}}}

  //{{{  synchronized int sync_read () {
  synchronized int sync_read () {
    if (size == 0) {
      waiting_reader = true;
      //{{{  wait ();
      try {
        wait ();
      } catch (InterruptedException e) {
        System.out.println ("BUFFER_OF_INT: InterruptedException exception raised" +
          " whilst waiting to read from an empty buffer ...");
      }
      //}}}
    }
    int tmp = lo;                    // ASSERT: size > 0
    lo = (lo + 1) % max;
    size--;
    if (waiting_writer) {            // ASSERT: size == (max - 1)
      waiting_writer = false;
      notify ();
    }
    return buffer[tmp];
  }
  //}}}
  //{{{  synchronized void sync_write (int n) {
  synchronized void sync_write (int n) {
    if (size == max) {
      waiting_writer = true;
      //{{{  wait ();
      try {
        wait ();
      } catch (InterruptedException e) {
        System.out.println ("BUFFER_OF_INT: InterruptedException exception raised" +
          " whilst waiting to write to a full buffer ...");
      }
      //}}}
    }
    buffer[hi] = n;                  // ASSERT: size < max
    hi = (hi + 1) % max;
    size++;
    if (waiting_reader) {            // ASSERT: size == 1
      waiting_reader = false;
      notify ();
    }
  }
  //}}}

}
//}}}

//{{{  class READ_MONITOR {
class READ_MONITOR {

  BUFFER_OF_INT buffer_of_int;

  //{{{  constructor
  READ_MONITOR (BUFFER_OF_INT buffer_of_int) {
    this.buffer_of_int = buffer_of_int;
  }
  //}}}
  //{{{  public synchronized int read () {
  public synchronized int read () {
    return buffer_of_int.sync_read ();
  }
  //}}}
}
//}}}
//{{{  class WRITE_MONITOR {
class WRITE_MONITOR {

  BUFFER_OF_INT buffer_of_int;

  //{{{  constructor
  WRITE_MONITOR (BUFFER_OF_INT buffer_of_int) {
    this.buffer_of_int = buffer_of_int;
  }
  //}}}
  //{{{  public synchronized void write (int n) {
  public synchronized void write (int n) {
    buffer_of_int.sync_write(n);
  }
  //}}}
}
//}}}

//}}}
//{{{  note on InterruptedException

Note: the InterruptedException exception possibly raised by the wait()
method is handled locally to simplify the public interface.  We probably
shouldn't do that!

//}}}

//}}}
//{{{  comments on BUFFER_OF_INT.java (entirely loop-free!)

This is really just the earlier one but with the synchronized read/write
methods not made public and the public forced to queue up for their respective
read/write monitor first.  The while-loop which represented the potential
infinite overtaking problem earlier is replace by an if-statement.  The
code now contains no looping constructs ... at last!

The public (un-synchronized) read/write methods just call their respective
monitors, which just call back the private (synchronized) read/write methods.
This code is trivial, necessary and, sadly, expensive.

We could elminate one of these calls by making READ_MONITOR and WRITE_MONITOR
the public classes, hiding BUFFER_OF_INT and letting external threads call the
read/write monitors directly.  In that case, we should probably change these
class names to something like READ_BUFFER_OF_INT and WRITE_BUFFER_OF_INT.
But this does leave a more complicated buffer instantiation for the user, who
would have to declare two mutually-referring objects for each buffer.  That's
what we had to do (I think unnecessarilly) for Sun's Piped I/O Streams that
Oyvind originally used.

//}}}
//{{{  critical observations on BUFFER_OF_INT.java

//{{{  the code is passive and oriented to its users

This is a comment on (informal) verification.  The Java code for this buffer,
although using insights obtained from the occam version, is not at all like
occam.  The problem is that this code is passive ... it has no life of its
own ... it relies on other threads to call its methods for things to happen.

The algorithms are, therefore, not oriented to this buffer but are oriented
to the callers of this buffer.  To me, this is not object-oriented whatever
the official line may be ... this code is `caller-oriented' ... i.e. it's
old-fashioned non-paradigm-shifted procedural code!

This buffer is representative of "per-activity threads", where the actors
in this case are the nice "per-object threads" that are modelled after the
occam processes.  The trouble with old-fashioned non-paradigm-shifted
procedural code in a parallel (or multi-threaded) context is that reasoning
about it is even harder than normal ... see below.

//}}}
//{{{  the semantics of its methods are not self-contained

Consider:

  //{{{  synchronized int sync_read () {
  synchronized int sync_read () {
    if (size == 0) {
      waiting_reader = true;
      ...  wait ();
    }
    int tmp = lo;                    // ASSERT: size > 0
    lo = (lo + 1) % max;
    size--;
    if (waiting_writer) {            // ASSERT: size == (max - 1)
      waiting_writer = false;
      notify ();
    }
    return buffer[tmp];
  }
  //}}}

The assertion "// ASSERT: size > 0" is crucial to the correctness of this
algorithm, yet is impossible to deduce just by looking at this code.  We
have to understand a much wider context (e.g. all the other methods for
the object) in order to understand this one.  The variable "size" magically
gets changed behind our backs during the "...  wait ();"!  The semantics
of that `variable' do not correspond to the semantics of proper variables
(such as those in occam), which don't change their values behind our backs.
Their semantics of these `variables' are very complex.

//}}}
//{{{  occam semantics are WYSIWIG and compositional under PAR

With the above kind of Java (passive objects supporting "per-activity"
external threads), the individual semantics of "sync_read" is completely
changed by its composition with the other methods.

By contrast, in occam we can reason the full semantics of any fragment of
code on-its-own.  Two fragments can be combined (using SEQ, IF, PAR, ...)
and the resulting semantics is a straightforward composition of their
individual semantics.

For example, consider the occam3 fragment (which has the same responsibility
as the Java "sync_read" method):

  //{{{  (size > 0) & ACCEPT read (INT n)
  (size > 0) & ACCEPT read (INT n)
    SEQ
      n := buffer[lo]
      lo, size := (lo + 1)\max, size - 1
  //}}}

In the above, the variables "size", "n", "buffer" and "lo" really *are*
variables and the semantics of this guard does not depend on any outer
context ... what-you-see-is-what-you-get (which is definite not what's going
on in the Java).

//}}}
//{{{  for simple semantics, we must bind objects and threads together

The reason why the occam3 semantics are simple and compositional under
parallelism (multi-threading) is that threads and objects are unbreakably
bound together.

The occam3 MODULE TYPE BUFFER.OF.INT is a class definition whose instances
are objects with their own thread of control ("per-object threads").  The
setting up of the "(size > 0)" pre-condition on the above "ACCEPT read"
guard is action initiated by that thread of control ... it's not a response
triggered by some external thread making a call.  Readers are prevented from
blundering in when the buffer is empty, so we have no mess to resolve with
"wait/notify" and varaibles that aren't variables.  The buffer has to have
its own life to achieve that.

The Java BUFFER_OF_INT is a class definition whose instances are objects with
no life of their own.  Readers/writers blunder in when they can't be serviced
and sorting out the mess requires `variables' that get side-effected by other
threads, busy-loops and/or tricky wait/notify semantics.  The semantics of its
methods are not compositional under multi-threading.  The semantics of its
methods are just plain wrong when considered individually!

//}}}

//}}}

//}}}
//{{{  lessons

The lessons seem to be:

  o design only with objects-and-threads bound together ("per-object threads")
    ... let's call them "processes";
  o the glue to enable processes to be connected is stucky stuff ... they are
    passive objects (used by our processes) ... their semantics are hard;
  o build a "complete" infrastructure of these glue-objects ... do this once
    taking great care ... these glue-objects will be channels, buffers,
    multiplexors and de-multiplexors of various kinds;
  o armed with this infrastructure, never design passive externally-executed
    objects again;
  o when everyone has agreed what a splendid thing this all is, advance the
    language out of the mid-1970s by binding this infrastrucuture into the
    language, not forgetting to remove those independent concepts of threads
    and objects that are now redundant;
  o we have just moved from Tony Hoare's monitors to CSP and rediscovered
    occam ...

//}}}
//{{{  if only ...

Extract from Sun's Java Tutorial:

> Monitors
>
>      The Java language and runtime system support thread synchronization
>      through the use of monitors which were first outlined in C. A. R.
>      Hoare's article Communicating Sequential Processes (Communications of
>      the ACM, Vol. 21, No. 8, August 1978, pp. 666-677).

//}}}

Peter Welch
(20/5/96)