歡迎您光臨本站 註冊首頁

了解 actor 如何提供新的應用程序代碼建模方法

主要晶元廠商已經開始提供同時運行兩個或更多個核的晶元(雖然不一定更快),在這種情況下,併發性很快成為每個軟體開發人員都關心的熱門主題.本文延續 Ted Neward 的另一篇文章 深入了解 Scala 併發性.在本文中,Ted Neward 通過研究 actor 深入討論併發性這個熱門主題,actor 是通過傳遞消息相互協作的執行實體.

關於本系列

Ted Neward 潛心研究 Scala 編程語言,並帶您跟他一起徜徉.在這個新的 developerWorks 系列 中,您將深入了解 Scala 並看到 Scala 的語言功能的實際效果.在進行相關比較時,Scala 代碼和 Java™ 代碼將放在一起展示,但(您將發現)Scala 中的許多內容與您在 Java 編程中發現的任何內容都沒有直接關聯,而這正是 Scala 的魅力所在!畢竟,如果 Java 代碼可以做到的話,又何必學習 Scala 呢?

在 前一篇文章 中,我討論了構建併發代碼的重要性(無論是否是 Scala 代碼),還討論了在編寫併發代碼時開發人員面對的一些問題,包括不要鎖住太多東西、不要鎖住太少東西、避免死鎖、避免生成太多線程等等.

這些理論問題太沉悶了.為了避免讀者覺得失望,我與您一起研究了 Scala 的一些併發構造,首先是在 Scala 中直接使用 Java 語言的併發庫的基本方法,然後討論 Scala API 中的 MailBox 類型.儘管這兩種方法都是可行的,但是它們並不是 Scala 實現併發性的主要機制.

真正提供併發性的是 Scala 的 actor.

什麼是 「actor」?

「actor」 實現在稱為 actor 的執行實體之間使用消息傳遞進行協作(注意,這裡有意避免使用 「進程」、「線程」 或 「機器」 等辭彙).儘管它聽起來與 RPC 機制有點兒相似,但是它們是有區別的.RPC 調用(比如 Java RMI 調用)會在調用者端阻塞,直到伺服器端完成處理併發送回某種響應(返回值或異常),而消息傳遞方法不會阻塞調用者,因此可以巧妙地避免死鎖.

僅僅傳遞消息並不能避免錯誤的併發代碼的所有問題.另外,這種方法還有助於使用 「不共享任何東西」 編程風格,也就是說不同的 actor 並不訪問共享的數據結構(這有助於促進封裝 actor,無論 actor 是 JVM 本地的,還是位於其他地方) — 這樣就完全不需要同步了.畢竟,如果不共享任何東西,併發執行就不涉及任何需要同步的東西.

這不算是對 actor 模型的正規描述,毫無疑問,具有更正規的計算機科學背景的人會找到各種更嚴謹的描述方法,能夠描述 actor 的所有細節.但是對於本文來說,這個描述已經夠了.在網上可以找到更詳細更正規的描述,還有一些學術文章詳細討論了 actor 背後的概念(請您自己決定是否要深入學習這些概念).現在,我們來看看 Scala actors API.

Scala actor

使用 actor 根本不困難,只需使用 Actor 類的 actor 方法創建一個 actor,見清單 1:

清單 1. 開拍!

import scala.actors._, Actor._

package com.tedneward.scalaexamples.scala.V4
{
object Actor1
{
def main(args : Array[String]) =
{
val badActor =
actor
{
receive
{
case msg => System.out.println(msg)
}
}

badActor ! "Do ya feel lucky, punk?"
}
}
}

這裡同時做了兩件事.

首先,我們從 Scala Actors 庫的包中導入了這個庫,然後從庫中直接導入了 Actor 類的成員;第二步並不是完全必要的,在後面的代碼中可以使用 Actor.actor 替代 actor,但是這麼做能夠表明 actor 是語言的內置結構並(在一定程度上)提高代碼的可讀性.

下一步是使用 actor 方法創建 actor 本身,這個方法通過參數接收一個代碼塊.在這裡,代碼塊執行一個簡單的 receive(稍後討論).結果是一個 actor,它被存儲在一個值引用中,供以後使用.

請記住,除了消息之外,actor 不使用其他通信方法.使用 ! 的代碼行實際上是一個向 badActor 發送消息的方法,這可能不太直觀.Actor 內部還包含另一個 MailBox 元素(已討論);! 方法接收傳遞過來的參數(在這裡是一個字元串),把它發送給郵箱,然後立即返回.

消息交付給 actor 之後,actor 通過調用它的 receive 方法來處理消息;這個方法從郵箱中取出第一個可用的消息,把它交付給一個模式匹配塊.注意,這裡沒有指定模式匹配的類型,任何消息都是匹配的,消息被綁定到 msg 名稱(為了列印它).

一定要注意一點:對於可以發送的類型,沒有任何限制 — 不一定要像前面的示例那樣發送字元串.實際上,基於 actor 的設計常常使用 Scala case 類攜帶實際消息本身,這樣就可以根據 case 類的參數/成員的類型提供隱式的 「命令」 或 「動作」,或者向動作提供數據.

例如,假設希望 actor 用兩個不同的動作來響應發送的消息;新的實現可能與清單 2 相似:

清單 2. 嗨,我是導演!

object Actor2
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;

def main(args : Array[String]) =
{
val badActor =
actor
{
receive
{
case NegotiateNewContract =>
System.out.println("I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" action "s " bodyPart ")")
case _ =>
System.out.println("Huh? I'll be in my trailer.")
}
}

badActor ! NegotiateNewContract
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
}
}

到目前為止,看起來似乎沒問題,但是在運行時,只協商了新合同;在此之後,JVM 終止了.初看上去,似乎是生成的線程無法足夠快地響應消息,但是要記住在 actor 模型中並不處理線程,只處理消息傳遞.這裡的問題其實非常簡單:一次接收使用一個消息,無論隊列中有多少個消息正在等待處理都無所謂,只有一次接收,只交付一個消息.

糾正這個問題需要對代碼做以下修改,見清單 3:

把 receive 塊放在一個接近無限的循環中.

創建一個新的 case 類來表示什麼時候處理全部完成了.

清單 3. 現在我是一個更好的導演!

object Actor2
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;
case class ThatsAWrap;

def main(args : Array[String]) =
{
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println("I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" action "s " bodyPart ")")
case ThatsAWrap =>
System.out.println("Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println("Huh? I'll be in my trailer.")
}
}
}

badActor ! NegotiateNewContract
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
badActor ! ThatsAWrap
}
}

這下行了!使用 Scala actor 就這麼容易.

併發地執行動作

上面的代碼沒有反映出併發性 — 到目前為止給出的代碼更像是另一種非同步的方法調用形式,您看不出區別.(從技術上說,在第二個示例中引入接近無限循環之前的代碼中,可以猜出有一定的併發性存在,但這只是偶然的證據,不是明確的證明).

為了證明在幕後確實有多個線程存在,我們深入研究一下前一個示例:

清單 4. 我要拍特寫了

object Actor3
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;
case class ThatsAWrap;

def main(args : Array[String]) =
{
def ct =
"Thread " Thread.currentThread().getName() ": "
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println(ct "I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(ct line)
case Gesture(bodyPart, action) =>
System.out.println(ct "(" action "s " bodyPart ")")
case ThatsAWrap =>
System.out.println(ct "Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println(ct "Huh? I'll be in my trailer.")
}
}
}

System.out.println(ct "Negotiating...")
badActor ! NegotiateNewContract
System.out.println(ct "Speaking...")
badActor ! Speak("Do ya feel lucky, punk?")
System.out.println(ct "Gesturing...")
badActor ! Gesture("face", "grimaces")
System.out.println(ct "Speaking again...")
badActor ! Speak("Well, do ya?")
System.out.println(ct "Wrapping up")
badActor ! ThatsAWrap
}
}

運行這個新示例,就會非常明確地發現確實有兩個不同的線程:

main 線程(所有 Java 程序都以它開始)

Thread-2 線程,它是 Scala Actors 庫在幕後生成的

因此,在啟動第一個 actor 時,本質上已經開始了多線程執行.

但是,習慣這種新的執行模型可能有點兒困難,這是一種全新的併發性考慮方式.例如,請考慮 前一篇文章 中的 Producer/Consumer 模型.那裡有大量代碼,尤其是在 Drop 類中,我們可以清楚地看到線程之間,以及線程與保證所有東西同步的監視器之間有哪些交互活動.為了便於參考,我在這裡給出前一篇文章中的 V3 代碼:

清單 5. ProdConSample,v3 (Scala)

package com.tedneward.scalaexamples.scala.V3
{
import concurrent.MailBox
import concurrent.ops._

object ProdConSample
{
class Drop
{
private val m = new MailBox()

private case class Empty()
private case class Full(x : String)

m send Empty() // initialization

def put(msg : String) : Unit =
{
m receive
{
case Empty() =>
m send Full(msg)
}
}

def take() : String =
{
m receive
{
case Full(msg) =>
m send Empty(); msg
}
}
}

def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop()

// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);

importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}

// Spawn Consumer
spawn
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}
}
}

儘管看到 Scala 如何簡化這些代碼很有意思,但是它實際上與原來的 Java 版本沒有概念性差異.現在,看看如果把 Producer/Consumer 示例的基於 actor 的版本縮減到最基本的形式,它會是什麼樣子:

清單 6. Take 1,開拍!生產!消費!

object ProdConSample1
{
case class Message(msg : String)

def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " msg)
done = (msg == "DONE")
}
}
}

consumer ! "Mares eat oats"
consumer ! "Does eat oats"
consumer ! "Little lambs eat ivy"
consumer ! "Kids eat ivy too"
consumer ! "DONE"
}
}

第一個版本確實簡短多了,在某些情況下可能能夠完成所需的所有工作;但是,如果運行這段代碼並與以前的版本做比較,就會發現一個重要的差異 — 基於 actor 的版本是一個多位置緩衝區,而不是我們以前使用的單位置緩衝.這看起來是一項改進,而不是缺陷,但是我們要通過對比確認這一點.我們來創建 Drop 的基於 actor 的版本,在這個版本中所有對 put() 的調用必須由對 take() 的調用進行平衡.

幸運的是,Scala Actors 庫很容易模擬這種功能.希望讓 Producer 一直阻塞,直到 Consumer 接收了消息;實現的方法很簡單:讓 Producer 一直阻塞,直到它從 Consumer 收到已經接收消息的確認.從某種意義上說,這就是以前的基於監視器的代碼所做的,那個版本通過對鎖對象使用監視器發送這種信號.

在 Scala Actors 庫中,最容易的實現方法是使用 !? 方法而不是 ! 方法(這樣就會一直阻塞到收到確認時).(在 Scala Actors 實現中,每個 Java 線程都是一個 actor,回復會發送到與 main 線程隱式關聯的郵箱).這意味著 Consumer 需要發送某種確認;這要使用隱式繼承的 reply(它還繼承 receive 方法),見清單 7:

清單 7. Take 2,開拍!


object ProdConSample2
{
case class Message(msg : String)

def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " msg)
done = (msg == "DONE")
reply("RECEIVED")
}
}
}

System.out.println("Sending....")
consumer !? "Mares eat oats"
System.out.println("Sending....")
consumer !? "Does eat oats"
System.out.println("Sending....")
consumer !? "Little lambs eat ivy"
System.out.println("Sending....")
consumer !? "Kids eat ivy too"
System.out.println("Sending....")
consumer !? "DONE"
}
}

如果喜歡使用 spawn 把 Producer 放在 main() 之外的另一個線程中(這非常接近最初的代碼),那麼代碼可能像清單 8 這樣:

清單 8. Take 4,開拍!

object ProdConSampleUsingSpawn
{
import concurrent.ops._

def main(args : Array[String]) : Unit =
{
// Spawn Consumer
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("MESSAGE RECEIVED: " msg)
done = (msg == "DONE")
reply("RECEIVED")
}
}
}

// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too",
"DONE"
);

importantInfo.foreach((msg) => consumer !? msg)
}
}
}

無論從哪個角度來看,基於 actor 的版本都比原來的版本簡單多了.讀者只要讓 actor 和隱含的郵箱自己發揮作用即可.

但是,這並不簡單.actor 模型完全顛覆了考慮併發性和線程安全的整個過程;在以前的模型中,我們主要關注共享的數據結構(數據併發性),而現在主要關注操作數據的代碼本身的結構(任務併發性),儘可能少共享數據.請注意 Producer/Consumer 示例的不同版本的差異.在以前的示例中,併發功能是圍繞 Drop 類(有界限的緩衝區)顯式編寫的.在本文中的版本中,Drop 甚至沒有出現,重點在於兩個 actor(線程)以及它們之間的交互(通過不共享任何東西的消息).

當然,仍然可以用 actor 構建以數據為中心的併發構造;只是必須採用稍有差異的方式.請考慮一個簡單的 「計數器」 對象,它使用 actor 消息傳達 「increment」 和 「get」 操作,見清單 9:

清單 9. Take 5,計數!

object CountingSample
{
case class Incr
case class Value(sender : Actor)
case class Lock(sender : Actor)
case class UnLock(value : Int)

class Counter extends Actor
{
override def act(): Unit = loop(0)


def loop(value: int): Unit = {
receive {
case Incr() => loop(value 1)
case Value(a) => a ! value; loop(value)
case Lock(a) => a ! value
receive { case UnLock(v) => loop(v) }
case _ => loop(value)
}
}
}

def main(args : Array[String]) : Unit =
{
val counter = new Counter
counter.start()
counter ! Incr()
counter ! Incr()
counter ! Incr()
counter ! Value(self)
receive { case cvalue => Console.println(cvalue) }
counter ! Incr()
counter ! Incr()
counter ! Value(self)
receive { case cvalue => Console.println(cvalue) }
}
}

為了進一步擴展 Producer/Consumer 示例,清單 10 給出一個在內部使用 actor 的 Drop 版本(這樣,其他 Java 類就可以使用這個 Drop,而不需要直接調用 actor 的方法):

清單 10. 在內部使用 actor 的 Drop

object ActorDropSample
{
class Drop
{
private case class Put(x: String)
private case object Take
private case object Stop

private val buffer =
actor
{
var data = ""
loop
{
react
{
case Put(x) if data == "" =>
data = x; reply()
case Take if data != "" =>
val r = data; data = ""; reply(r)
case Stop =>
reply(); exit("stopped")
}
}
}

def put(x: String) { buffer !? Put(x) }
def take() : String = (buffer !? Take).asInstanceOf[String]
def stop() { buffer !? Stop }
}

def main(args : Array[String]) : Unit =
{
import concurrent.ops._

// Create Drop
val drop = new Drop()

// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);

importantInfo.foreach((msg) => { drop.put(msg) })
drop.put("DONE")
}

// Spawn Consumer
spawn
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
drop.stop()
}
}
}

可以看到,這需要更多代碼(和更多的線程,每個 actor 都在一個線程池內部起作用),但是這個版本的 API 與以前的版本相同,它把所有與併發性相關的代碼都放在 Drop 內部,這正是 Java 開發人員所期望的.

actor 還有更多特性.

在規模很大的系統中,讓每個 actor 都由一個 Java 線程支持是非常浪費資源的,尤其是在 actor 的等待時間比處理時間長的情況下.在這些情況下,基於事件的 actor 可能更合適;這種 actor 實際上放在一個閉包中,閉包捕捉 actor 的其他動作.也就是說,現在並不通過線程狀態和寄存器表示代碼塊(函數).當一個消息到達 actor 時(這時顯然需要活動的線程),觸發閉包,閉包在它的活動期間借用一個活動的線程,然後通過回調本身終止或進入 「等待」 狀態,這樣就會釋放線程.(請參見 參考資料 中 Haller/Odersky 的文章).

在 Scala Actors 庫中,這要使用 react 方法而不是前面使用的 receive.使用 react 的關鍵是在形式上 react 不能返回, react 中的實現必須重複調用包含 react 塊的代碼塊.簡便方法是使用 loop 結構創建一個接近無限的循環.這意味著 清單 10 中的 Drop 實現實際上只通過借用調用者的線程執行操作,這會減少執行所有操作所需的線程數.(在實踐中,我還沒有見過在簡單的示例中出現這種效果,我想我們只能暫且相信 Scala 設計者的說法).

在某些情況下,可能選擇通過派生基本的 Actor 類(在這種情況下,必須定義 act 方法,否則類仍然是抽象的)創建一個新類,它隱式地作為 actor 執行.儘管這是可行的,但是這種思想在 Scala 社區中不受歡迎;在一般情況下,我在這裡描述的方法(使用 Actor 對象中的 actor 方法)是創建 actor 的首選方法.

結束語

actor 編程需要與 「傳統」 對象編程不同的風格,在使用 actor 時要記住幾點.

首先,actor 的主要能力來源於消息傳遞風格,而不採用阻塞-調用風格,這是它的主要特點.(有意思的是,也有使用消息傳遞作為核心機制的面向對象語言.最知名的兩個例子是 Objective-C 和 Smalltalk,還有 ThoughtWorker 的 Ola Bini 新創建的 Ioke).如果創建直接或間接擴展 Actor 的類,那麼要確保對對象的所有調用都通過消息傳遞進行.

第二,可以在任何時候交付消息,更重要的是,在發送和接收之間可能有相當長的延遲,一定要確保消息攜帶正確地處理它們所需的所有狀態.這種方式會:

讓代碼更容易理解(消息攜帶處理所需的所有狀態).

減少 actor 訪問某些地方的共享狀態的可能性,從而減少發生死鎖或其他併發性問題的機會.

第三,actor 應該不會阻塞,您從前面的內容應該能夠看出這一點.從本質上說,阻塞是導致死鎖的原因;代碼可能產生的阻塞越少,發生死鎖的可能性就越低.

很有意思的是,如果您熟悉 Java Message Service (JMS) API,就會發現我給出的這些建議在很大程度上也適用於 JMS — 畢竟,actor 消息傳遞風格只是在實體之間傳遞消息,JMS 消息傳遞也是在實體之間傳遞消息.它們的差異在於,JMS 消息往往比較大,在層和進程級別上操作;而 actor 消息往往比較小,在對象和線程級別上操作.如果您掌握了 JMS,actor 也不難掌握.

actor 並不是解決所有併發性問題的萬靈藥,但是它們為應用程序或庫代碼的建模提供了一種新的方式,所用的構造相當簡單明了.儘管它們的工作方式有時與您預期的不一樣,但是一些行為正是我們所熟悉的 — 畢竟,我們在最初使用對象時也有點不習慣,只要經過努力,您也會掌握並喜歡上 actor.


[火星人 ] 面向Java開發人員的Scala指南: 深入了解Scala併發性 了解 actor 如何提供新的應用程序代碼建模方法已經有331次圍觀

http://coctec.com/docs/java/show-post-61103.html