alice
library
manual.

Alice Project

The Channel structure


________ Synopsis ____________________________________________________

    signature CHANNEL
    structure Channel : CHANNEL
  

A fully concurrent imperative channel abstraction.

A consumer takes elements available at the beginning of the channel.
A producer inserts elements in the channel, either at the beginning (LIFO) or at the end (FIFO).
Channels are thread-safe: many consumers and producers can operate concurrently on the same channel.

Note: Channels contain implicit locks. If you stop a thread while it is manipulating a channel, it may cause all further access to the same channel to block, until the thread is restarted.


________ Import ______________________________________________________

    import signature CHANNEL from "x-alice:/lib/data/CHANNEL-sig"
    import structure Channel from "x-alice:/lib/data/Channel"

________ Interface ___________________________________________________

    signature CHANNEL =
    sig
	type 'a channel
        type 'a t = 'a channel

	exception Empty

	val channel :    unit -> 'a channel
        val put :        'a channel * 'a -> unit
	val push :       'a channel * 'a -> unit
        val get :        'a channel -> 'a
	val pop :        'a channel -> 'a
        val peek :       'a channel -> 'a
	val clone :      'a channel -> 'a channel
        val isEmpty :    'a channel -> bool
	val close :      'a channel -> unit
	val isClosed :   'a channel -> bool
	val waitClosed : 'a channel -> unit
        val purge :      'a channel -> unit
	val app :        ('a -> unit) -> 'a channel -> unit
	val appNB :      ('a -> unit) -> 'a channel -> unit
	val toList :     'a channel -> 'a list
	val toListNB :   'a channel -> 'a list
    end
  

________ Description _________________________________________________

type 'a channel
type t = channel

The type of polymorphic channels.

exception Empty

Used to indicate invalid accesses to an empty channel. Equal to List.Empty.

channel ()

Creates a new channel that is initially empty.

put (ch, x)

Producer: Adds the value x at the end of the channel ch.

push (ch, x)

Producer: The value x is inserted at the beginning of the channel ch. That is, next read (peek or get) will return x.

get ch
pop ch

Consumer: Remove and returns the first element of channel ch if available. Raise Empty if the channel is closed and empty. Otherwise, blocks until an element is inserted (put or pushed).

peek ch

Returns the first element of channel ch if available, but does not remove it from the channel (it is not a consumer). Like get, it may block or raise Empty.

clone ch

Returns a new channel initialized with the elements of channel ch.

isEmpty ch

Returns true iff the channel is empty, that is, if reading an element would block.

close ch.

Closes channel ch: all subsequent writes (push and put) will have no effect.

isClosed ch

Returns true iff the channel has been closed with close.

waitClosed ch

Returns unit if the channel is closed, or blocks until it is closed.

purge ch

Consumer: Remove all elements of the channel ch.

app f ch

Consumer: Takes elements out of the channel and applies function f to each of them. Does not return until the channel is closed.

appNB f ch

Consumer: Takes elements out of the channel and applies function f to each of them. Returns as soon as the channel is empty.

toList ch

Consumer: Returns a lazy list with the elements of the channel. Elements of the list are removed from the channel when they are evaluated. Reading the list blocks if the channel is empty.

toListNB ch

Consumer: Returns a lazy list with the elements of the channel. Elements of the list are removed from the channel when they are evaluated. Reading the list returns an empty list if the channel is empty.


________ Example _____________________________________________________

The following functor creates a module which evaluates tasks sequentially.

    functor NewSequence () =
    struct
	(* A new channel for jobs that must be run sequentially. *)
	val jobs = Channel.channel ()
	    
	(* f () will be evaluated sequentially *)
	fun add f = Channel.put (jobs, f)
	fun app f arg = add (fn () => f arg)

	(* Run jobs. *)
	val _ = spawn (Channel.app (fn job => job()) jobs)
	    
	(* Nicely stop all jobs. *)
	fun stop () =
	    (Channel.close jobs ; 
	     Channel.purge jobs)
	    
    end
    

Usage:

    structure Log = NewSequence ()
    val printInt = print o Int.toString

    val _ = Log.app print "First Message"
    val _ = Log.app printInt 32
    
    


last modified 2007/Mar/30 17:10