load help; @[name] dao.concurrent @[name] @[title] Concurrent Programming @[title] @[text] Dao has native support for concurrent programming on multicore computers. The basic concurrent computational unit in Dao is a tasklet which consists of a virtual machine process (context) and a future value (handle). A tasklet is much cheaper than a native thread, because only active tasklets are attached to native threads for execution. Dao provides several features for concurrent programming such as: multi-threading module @[code]mt@[code] with parallelized code section methods, asynchronous function call, asynchronous class and communication channels. Support for conventional multi-threaded programming is all supported by an external module. @[text] ################################################################################ ################################################################################ #### Multithreading ################################################################################ ################################################################################ @[name] dao.concurrent.threading @[name] @[title] Multithreading Module @[title] @[text] The multithreading module @[code]mt@[code] offers methods for creating tasklets and parallelizing some typical parallelization tasks over standard container types. @[subsection] Starting Tasklet @[subsection] The simplest way to start a tasklet is to use the functional method @[code]mt.start@[code] to run an expression or a block of codes: @[code] var a = 123 var fut = mt.start { a*a } var fut2 = mt.start { var sum2 = 0; for(var i = 1 : 1000 ){ sum += a*a } io.writeln( sum2 ) } @[code] This functional method is implemented as a code section (code block as in Ruby) method, and can take a block of code as an additional implicit parameter. @[code]mt.start@[code] returns the created tasklet in the form of a future value. The created tasklet may be executed by a reused thread from the internal thread pool or by a newly created thread. @[subsection] Future Value @[subsection] Future value is a type that represents a value that is available only after a tasklet has been completed. The future value for the tasklet can be used to perform simple scheduling such as blocking the current thread indefinitely or a finite amount of time to wait for the thread to finish. For example, @[code] fut.wait() fut2.wait( 0.1 ) @[code] Its value can be obtained by, @[code] var value = fut.value() @[code] which will block the current tasklet and wait for the completion of the tasklet represented by @[code]fut@[code]. @[subsection] Parallelized Code Block Methods @[subsection] However, the simplest way to do parallel programming is to use the parallelized functional methods of @[code]mt@[code], which provides the following parallelized functional methods, @[list] -- @[code]iterate()@[code]: Iterate a predefined number of times, or iterate over a list, map or array, and execute the code block on each of the items; -- @[code]map()@[code] Map a list, map or array to another list, map or array, using the values returned by the code block; -- @[code]apply()@[code] Apply the values returned by the code block to a list, map or array; -- @[code]find()@[code] Find the first item in a list or map the satisfy the condition as tested by the code block. @[list] Examples, @[code] mt.iterate( 10 ) { io.writeln( X ) } mt.iterate( {1,2,3,4,5,6}, 3 ) { io.writeln( X ) } @[code] @[subsection] Other Methods @[subsection] The @[code]mt@[code] module also supports a few other methods: @[list] --@[code] select( group: list<@T>, timeout = -1.0 ) => tuple> @[code] Select on a group of channels or future values, wait with a timeout for data become available from any of the channels. It returns a tuple with the following possible values: @[list(sub)] -- @[code](Channel|Future, data, $selected)@[code]; -- @[code](none, none, $timeout)@[code]; -- @[code](none, none, $finished)@[code]; @[list(sub)] @[list] @[text] ################################################################################ ################################################################################ #### Asynchronous Call ################################################################################ ################################################################################ @[name] dao.concurrent.async-call @[name] @[title] Asynchronous Call @[title] @[text] Asynchronous Function Call (AFC) allows a function call to be executed as a tasklet, and return immediately a future value that can be use to block on the tasklet and wait for its completion. Any standard function call followed by @[code]!!@[code] will start an AFC. @[code] routine MyFunction( n = 10 ) { for(var i = 1 : n ) io.writeln( i ) } var f = MyFunction( 20 ) !! io.writeln( f.value() ) @[code] When multiple methods of a class instance are called in asynchronous mode, their execution will be mutually exclusive, namely, at any given time only call (tasklet) can be active. So in this way, the class instance acts just like a @[link]monitor@@http://en.wikipedia.org/wiki/Monitor_(synchronization)@[link]. @[text] ################################################################################ ################################################################################ #### Asynchronous Class ################################################################################ ################################################################################ @[name] dao.concurrent.async-class @[name] @[title] Asynchronous Class @[title] @[text] An asynchronous class is a class whose instance will automatically become asynchronous instance/object. And when an asynchronous object invokes a method, it will be automatically called in asynchronous mode, and start a tasklet and return a future value. Such tasklets are queued and and executed in the order they are queued. So such asynchronous object acts like an actor in the @[link]actor model@@http://en.wikipedia.org/wiki/Actor_model@[link] considering that calling methods of such object is just like sending messages to the object with function parameters being the message contents. For each instance, these messages are processed one by one in the order of receiving. Asynchronous class is defined by specifying asynchronous mode with @[code]!!@[code] after the class name. Here is an simple example, @[code] class Account !! { private var balance = 0 public routine Account( init = 0 ){ balance = init } routine Withdraw( amount: int ) => enum { if ( balance < amount ) return $false balance -= amount return $true } routine Deposit( amount: int ) => int { balance += amount return balance } routine Balance() => int { return balance } } var acount1 = Account( 100 ) var acount2 = Account( 100 ) var future1 = acount1.Withdraw( 10 ) if( future1.value() == $true ){ var future2 = acount2.Deposit( 10 ) } var future3 = acount1.Deposit( 20 ) io.writeln( 'Balance in account1:', acount1.Balance().value() ) io.writeln( 'Balance in account2:', acount2.Balance().value() ) @[code] Like calling @[code]mt.start()@[code], calling a method on an asynchronous object will return a future value, which can be used to check the status of the asynchronous call. @[text] ################################################################################ ################################################################################ #### Channels ################################################################################ ################################################################################ @[name] dao.concurrent.channel @[name] @[title] Communication Channel @[title] @[text] Tasklets (represented by future values) created from @[code]mt::start@[code], asynchronous function call and the methods of asynchronous object can be assigned to native threads to run concurrently. Tasklets can communicate and synchronize with each other using @[code]communication channels@[code]. The @[code]Channel@[code] type is implemented as a customized C data type that supports template-like type arguments. So to construct a channel that can send and receive integers, one can use, @[code] var chan = mt::Channel() @[code] Here @[code]Channel@[code] is prefixed with @[code]mt::@[code], because @[code]Channel@[code] is defined in the built-in @[code]mt@[code] multi-threading module. The type argument @[code]int@[code] indicates this channel can only be used to transmit integers. Currently, only primitive types (@[code]int@[code], @[code]float@[code], @[code]double@[code], @[code]complex@[code], @[code]string@[code], @[code]enum@[code]) plus @[code]array@[code] types, and their composition through @[code]list@[code], @[code]map@[code] and @[code]tuple@[code] types are supported for channels. Each channel has a capacity for transmitting data, which are buffered in the channel. When the channel's buffer reached its capacity, any attempt to send data to it will result in the blocking of the sender, which will be resumed either after a timeout or after some data of the channel have been read out of the buffer to make room for the new data being sent by the blocked sender. Channel capacity can be passed to the constructor of the channel as an optional parameter, which is one by default. The channel type has a number of methods which include: @[list] --@[code]buffer( self: Channel<@V> ) => int@[code] It returns the number of unprocessed data items in the channel buffer. --@[code] cap( self: Channel<@V> ) => int @[code] Return the channel capacity. --@[code] cap( self: Channel<@V>, cap: int ) => int @[code] Set the channel capacity and returns the previous one. Set the capacity to zero to prevent it from receiving further data, and effectively close it when its buffer become empty. --@[code] send( self: Channel<@V>, data: @V, timeout: float = -1 ) => int @[code] Send a data item with a timeout. A negative timeout parameter means infinite waiting. It return 1 upon timeout, 0 otherwise. --@[code] receive( self: Channel<@V>, timeout: float = -1 ) => tuple> @[code] Receive a data item with a timeout. A negative timeout parameter means infinite waiting. It returns a tuple with the following possible values: @[list(sub)] -- @[code](data, $received)@[code]; -- @[code](none, $timeout)@[code]; -- @[code](none, $finished)@[code]; @[list(sub)] @[list] Here is a simple example, where integers are sent through one channel and strings through another, and these integers and strings are received using the select method: @[code] load sys var chans = { mt::Channel(1), mt::Channel(1) } mt.start { var index = 0; while( ++index <= 10 ){ if( rand(2) ){ io.writeln( "sending integer:", index ); chans[0].send( index ) }else{ io.writeln( "sending string: S" + (string) index ); chans[1].send( 'S' + (string) index ) } sys.sleep(0.5) } # set channel buffer sizes to zero to close them: chans[0].cap(0) chans[1].cap(0) } while( 1 ){ var data = mt::select( chans, 0.2 ) io.writeln( "received:", data ); if( data.status == $finished ) break } @[code] @[text]