load help; @[name] dao.concurrent @[name] @[title] 并行计算编程 @[title] @[text] 道语言对多核计算机上的并行编程有原生的支持。 道里面的基本并行计算单元是轻任务, 它由一个虚拟机进程(环境)和一个未来值(句柄)构成。 道语言的轻任务比系统的原生线程更廉价占更好的计算机资源, 因为只有活跃的轻任务才会占用原生线程进行执行。 道提供了多个的特性来支持并行编程,诸如: 带并行化代码块方法的多线程模块@[code]mt@[code], 异步函数调用,异步类实例和通讯管道等。 传统的多线程编程也被作为外部模块支持。 @[text] ################################################################################ ################################################################################ #### Multithreading ################################################################################ ################################################################################ @[name] dao.concurrent.threading @[name] @[title] 多线程模块 @[title] @[text] 多线程模块@[code]mt@[code]提供了创建轻任务的方法 和自动并行化的方法来自动并行一些常见的使用标准容器类型的计算。 @[subsection] 创建轻任务 @[subsection] 创建轻任务的最简单的方法是使用函数式方法@[code]mt.start@[code] 来运行一个表达式或代码块: @[code] var a = 123 var fut = mt.start { a*a } var fut2 = mt.start { for(var i = 1 : 1000 ) b = a*a } @[code] 这个函数式方法是以代码块方法(类似Ruby里的Code Block)的形式实现, 它能使用一个调用时附加的代码块作为隐式参数。 @[code]mt.start@[code]将创建的轻任务以未来值的形式返回。 这个轻任务可能会从线程池里重用一个已创建的系统线程, 或创建一个新的系统线程来运行该任务。 @[subsection] 未来值 @[subsection] 未来值是一个表示未确定值的类型,它通常用来表示一个轻任务的结果。 在该轻任务正常结束之前,未来值的值始终是未确定的。 这种未来值可用来对轻任务做简单的调度。 它可以用来无限时或限时阻塞当前轻任务以等待它所表示的轻任务的结束。 例如: @[code] fut.wait() fut2.wait( 0.1 ) @[code] 未来值所表示的轻任务的的结果可由下面的方法获得: @[code] var value = fut.value() @[code] 这个方法将无限时阻塞当前轻任务以等待它所表示的轻任务的结束. @[subsection] 并行代码块方法 @[subsection] 最简单的并行编程方式是,使用多线程模块@[code]mt@[code]的并行代码块方法。 下面是@[code]mt@[code]所支持的并行代码块方法: @[list] -- @[code]iterate()@[code]: 并行迭代预先定义的次数以执行它的调用所带的代码块预先定义的次数; 或并行历遍列表,关联表或数组并对每个元素执行它所带的代码块; -- @[code]map()@[code] 并行映射一列表,关联表或数组到一由它所带代码块的执行结果组成的新的列表,关联表或数组; -- @[code]apply()@[code] 并行地历遍列表,关联表或数组的每个元素,并使用代码块的执行结果替换该元素的原有值; -- @[code]find()@[code] 在列表,关联表或数组上并行查找满足代码块所表示的条件的元素, 并返回第一个满足条件的元素。 @[list] Examples, @[code] mt.iterate( 10 ) { io.writeln( X ) } mt.iterate( {1,2,3,4,5,6}, 3 ) { io.writeln( X ) } @[code] @[subsection] 其它方法 @[subsection] @[code]mt@[code]还支持少数其他方法: @[list] --@[code] select( group: list<@T>, timeout = -1.0 ) => tuple> @[code] 从一组管道或未来值中选择来获得最新可用的管道或未来值。它支持一个等待超时时间。 它可返回一个带以下可能值的元组: -- @[code](Channel|Future, data, $selected)@[code]; -- @[code](none, none, $timeout)@[code]; -- @[code](none, none, $finished)@[code]; @[list(sub)] @[list] @[text] ################################################################################ ################################################################################ #### Asynchronous Call ################################################################################ ################################################################################ @[name] dao.concurrent.async-call @[name] @[title] 异步函数调用 @[title] @[text] 异步函数调用(Asynchronous Function Call, AFC)将使一个函数调用作为轻任务执行。 它将立即返回一个未来值以操作该轻任务(比如阻塞当前任务以等待该任务)。 任何一个函数调用后加@[code]!!@[code]将使该调用变成异步调用。 @[code] routine MyFunction( n = 10 ) { for(var i = 1 : n ) io.writeln( i ) } var f = MyFunction( 20 ) !! io.writeln( f.value() ) @[code] 当一个类实例的多个方法被以异步函数调用的方式执行时,它们的执行将是互斥的, 也就是说,在任何时刻都最多只有一个方法的异步调用(轻任务)被执行。 这样类实例的行为跟 @[link]monitor@@http://en.wikipedia.org/wiki/Monitor_(synchronization)@[link] 一样。 @[text] ################################################################################ ################################################################################ #### Asynchronous Class ################################################################################ ################################################################################ @[name] dao.concurrent.async-class @[name] @[title] 异步类 @[title] @[text] 异步类是一种特殊的类,当这种异步类的对象调用一个方法时, 这个调用将被自动地当作异步调用处理,并创建轻任务返回未来值。 这种轻任务将被存在一个队列里,并按它们存入队列的顺序执行。 因此,这种异步对象在行为上很类似 @[link]actor model@@http://en.wikipedia.org/wiki/Actor_model@[link] 里的actor。因为这里调用这种对象的方法就跟给actor发消息一样, 而调用参数即是消息内容。 对于每个异步对象,轻任务将按存入队列的顺序执行, 这基本等同actor按消息将收顺序处理消息。 异步类的定义由在普通类的定义里的类名后加异步模式标志@[code]!!@[code]组成。 这里是个简单的例子: @[code] class Account !! { private var balance = 0 public routine Account( init = 0 ){ balance = init } routine Withdraw( amount: int ) => enum { if ( balance < amount ) return $false balance -= amount return $true } routine Deposit( amount: int ) => int { balance += amount return balance } routine Balance() => int { return balance } } var acount1 = Account( 100 ) var acount2 = Account( 100 ) var future1 = acount1.Withdraw( 10 ) if( future1.value() == $true ){ var future2 = acount2.Deposit( 10 ) } var future3 = acount1.Deposit( 20 ) io.writeln( 'Balance in account1:', acount1.Balance().value() ) io.writeln( 'Balance in account2:', acount2.Balance().value() ) @[code] @[text] ################################################################################ ################################################################################ #### Channels ################################################################################ ################################################################################ @[name] dao.concurrent.channel @[name] @[title] 通讯管道 @[title] @[text] 轻任务(由未来值表示)可由@[code]mt::start@[code]方法, 异步函数调用和异步对象的方法调用产生。 它们将被分配到系统线程上并行执行。 轻任务可通过通讯管道进行通讯,同步与调度处理。 管道类型@[code]Channel@[code]被实现为一个用户定制的类型, 它可以带类似模版类型的类型参数。因此,要创建一个可发送和接收 整数的管道,可用下面的方式: @[code] var chan = mt::Channel() @[code] 这里@[code]Channel@[code]带了前缀@[code]mt::@[code], 因为@[code]Channel@[code]是多线程模块@[code]mt@[code]里定义的一个类型。 这个管道类型的类型参数为@[code]int@[code]表示这个管道只能用来传输整数。 目前仅基本类型如@[code]int@[code],@[code]float@[code],@[code]double@[code], @[code]complex@[code],@[code]string@[code],@[code]enum@[code], 数值数组类型以及这类型通过@[code]list@[code],@[code]map@[code]和@[code]tuple@[code] 构成的复合类型才可用作管道类型的类型参数。 每个管道对象都有个数据传输容量,它决定了管道里能缓存的消息个数。 当它缓存的消息个数达到它的容量后,再向它发送消息将导致发送者轻任务阻塞, 直到它等足一个预先指定的时间,或有其它轻任务从该管道里读出了消息使得 该管道的容量又变成未满的状态。 管道容量可在管道对象创建时作为参数传递给它的初始化方法。 这方法定义的缺省管道容量为一。 管道类型有以下方法: @[list] --@[code]buffer( self: Channel<@V> ) => int@[code] 它返回管道里未被读取的消息数目。 --@[code] cap( self: Channel<@V> ) => int @[code] 返回管道的容量。 --@[code] cap( self: Channel<@V>, cap: int ) => int @[code] 设置管道的容量,并返回原先的容量。将容量设为零将阻止它接收新的消息, 这实际上就将该管道关闭了。 --@[code] send( self: Channel<@V>, data: @V, timeout: float = -1 ) => int @[code] 向管道发送消息,并指定一个等待超时时间。负值超时时间表示无限时间等待。 超时后返回一,否者返回零。 --@[code] receive( self: Channel<@V>, timeout: float = -1 ) => tuple> @[code] 从管道里读取消息,并指定一个等待超时时间。负值超时时间表示无限时间等待。 它可返回一个带以下可能值的元组: @[list(sub)] -- @[code](data, $received)@[code]; -- @[code](none, $timeout)@[code]; -- @[code](none, $finished)@[code]; @[list(sub)] @[list] 这里是个简单的例子: @[code] load sys var chans = { mt::Channel(1), mt::Channel(1) } mt.start { var index = 0; while( ++index <= 10 ){ if( rand(2) ){ io.writeln( "sending integer:", index ); chans[0].send( index ) }else{ io.writeln( "sending string: S" + (string) index ); chans[1].send( 'S' + (string) index ) } sys.sleep(0.5) } # set channel buffer sizes to zero to close them: chans[0].cap(0) chans[1].cap(0) } while( 1 ){ var data = mt::select( chans, 0.2 ) io.writeln( "received:", data ); if( data.status == $finished ) break } @[code] @[text]