歡迎您光臨本站 註冊首頁

了解 Scala 如何簡化併發編程並繞過陷阱

對於許多(如果不是大多數)Java? 程序員來說,Scala 的吸引力在於處理併發性以及編寫線程安全的代碼時非常輕鬆.在本期文章中,Ted Neward 將開始深入研究 Scala 語言及環境所提供的各種併發特性和庫.

2003 年,Herb Sutter 在他的文章 「The Free Lunch Is Over」 中揭露了行業中最不可告人的一個小秘密,他明確論證了處理器在速度上的發展已經走到了盡頭,並且將由全新的單晶元上的并行 「內核」(虛擬 CPU)所取代.這一發現對編程社區造成了不小的衝擊,因為正確創建線程安全的代碼,在理論而非實踐中,始終會提高高性能開發人員的身價,而讓各公司難以聘用他們.看上去,僅有少數人充分理解了 Java 的線程模型、併發 API 以及 「同步」 的含義,以便能夠編寫同時提供安全性和吞吐量的代碼 —— 並且大多數人已經明白了它的困難所在.

據推測,行業的其餘部分將自力更生,這顯然不是一個理想的結局,至少不是 IT 部門努力開發軟體所應得的回報.

與 Scala 在 .NET 領域中的姐妹語言 F# 相似,Scala 是針對 「併發性問題」 的解決方案之一.在本期文章中,我討論了 Scala 的一些屬性,這些屬性使它更加勝任於編寫線程安全的代碼,比如默認不可修改的對象,並討論了一種返回對象副本而不是修改它們內容的首選設計方案.Scala 對併發性的支持遠比此深遠;現在,我們有必要來了解一下 Scala 的各種庫.

關於本系列

關於本系列 Ted Neward 潛心研究 Scala 編程語言,並帶您跟他一起徜徉.在這個新的 developerWorks 系列 中,您將深入了解 Scala 並看到 Scala 的語言功能的實際效果.在進行相關比較時,Scala 代碼和 Java 代碼將放在一起展示,但(您將發現)Scala 中的許多內容與您在 Java 編程中發現的任何內容都沒有直接關聯,而這正是 Scala 的魅力所在!畢竟,如果 Java 代碼可以做到的話,又何必學習 Scala 呢?

併發性基礎

在深入研究 Scala 的併發性支持之前,有必要確保您具備了對 Java 基本併發性模型的良好理解,因為 Scala 的併發性支持,從某種程度上說,建立在 JVM 和支持庫所提供的特性和功能的基礎之上.為此,清單 1 中的代碼包含了一個已知的 Producer/Consumer 併發性問題(詳見 Sun Java Tutorial 的 「Guarded Blocks」 小節).注意,Java Tutorial 版本並未在其解決方案中使用 java.util.concurrent 類,而是擇優使用了 java.lang.Object 中的較舊的 wait()/notifyAll() 方法:

清單 1. Producer/Consumer(Java5 之前)

package com.tedneward.scalaexamples.notj5;

class Producer implements Runnable
{
private Drop drop;
private String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};

public Producer(Drop drop) { this.drop = drop; }

public void run()
{
for (int i = 0; i < importantInfo.length; i )
{
drop.put(importantInfo[i]);
}
drop.put("DONE");
}
}

class Consumer implements Runnable
{
private Drop drop;

public Consumer(Drop drop) { this.drop = drop; }

public void run()
{
for (String message = drop.take(); !message.equals("DONE");
message = drop.take())
{
System.out.format("MESSAGE RECEIVED: %s%n", message);
}
}
}

class Drop
{
//Message sent from producer to consumer.
private String message;

//True if consumer should wait for producer to send message,
//false if producer should wait for consumer to retrieve message.
private boolean empty = true;

//Object to use to synchronize against so as to not "leak" the
//"this" monitor
private Object lock = new Object();

public String take()
{
synchronized(lock)
{
//Wait until message is available.
while (empty)
{
try
{
lock.wait();
}
catch (InterruptedException e) {}
}
//Toggle status.
empty = true;
//Notify producer that status has changed.
lock.notifyAll();
return message;
}
}

public void put(String message)
{
synchronized(lock)
{
//Wait until message has been retrieved.
while (!empty)
{
try
{
lock.wait();
} catch (InterruptedException e) {}
}
//Toggle status.
empty = false;
//Store message.
this.message = message;
//Notify consumer that status has changed.
lock.notifyAll();
}
}
}

public class ProdConSample
{
public static void main(String[] args)
{
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}

注意: 我在此處展示的代碼對 Sun 教程解決方案做了少許修改;它們提供的代碼存在一個很小的設計缺陷(參見 Java 教程 「缺陷」).

Java 教程 「缺陷」
好奇的讀者可能會將此處的代碼與 Java Tutorial 中的代碼進行比較,尋找它們之間有哪些不同;他們會發現我並未 「同步」 puttake 方法,而是使用了存儲在 Drop 中的 lock 對象.其原因非常簡單:對象的監測程序永遠都不會封裝在類的內部,因此 Java Tutorial 版本允許此代碼打破此規則(顯然很瘋狂):

public class ProdConSample   {     public static void main(String[] args)     {       Drop drop = new Drop();       (new Thread(new Producer(drop))).start();       (new Thread(new Consumer(drop))).start();   	synchronized(drop)   	{   	  Thread.sleep(60 * 60 * 24 * 365 * 10); // sleep for 10 years?!?   	}     }   }   



通過使用私有對象作為鎖定所依託的監測程序,此代碼將不會有任何效果.從本質上說,現在已經封裝了線程安全的實現;然後,它才能依賴客戶機的優勢正常運行.

Producer/Consumer 問題的核心非常容易理解:一個(或多個)生產者實體希望將數據提供給一個(或多個)使用者實體供它們使用和操作(在本例中,它包括將數據列印到控制台).Producer 和 Consumer 類是相應直觀的 Runnable-實現類:Producer 從數組中獲取 String,並通過 put 將它們放置到 Consumer 的緩衝區中,並根據需要執行 take.

問題的難點在於,如果 Producer 運行過快,則數據在覆蓋時可能會丟失;如果 Consumer 運行過快,則當 Consumer 讀取相同的數據兩次時,數據可能會得到重複處理.緩衝區(在 Java Tutorial 代碼中稱作 Drop)將確保不會出現這兩種情況.數據破壞的可能性就更不用提了(在 String 引用的例子中很困難,但仍然值得注意),因為數據會由 put 放入緩衝區,並由 take 取出.

關於此主題的全面討論請閱讀 Brian Goetz 的 Java Concurrency in Practice 或 Doug Lea 的 Concurrent Programming in Java(參見 參考資料),但是,在應用 Scala 之前有必要快速了解一下此代碼的運行原理.

當 Java 編譯器看到 synchronized 關鍵字時,它會在同步塊的位置生成一個 try/finally 塊,其頂部包括一個 monitorenter 操作碼,並且 finally 塊中包括一個 monitorexit 操作碼,以確保監控程序(Java 的原子性基礎)已經發布,而與代碼退出的方式無關.因此,Drop 中的 put 代碼將被重寫,如清單 2 所示:

清單 2. 編譯器失效后的 Drop.put

// This is pseudocode
public void put(String message)
{
try
{
monitorenter(lock)

//Wait until message has been retrieved.
while (!empty)
{
try
{
lock.wait();
} catch (InterruptedException e) {}
}
//Toggle status.
empty = false;
//Store message.
this.message = message;
//Notify consumer that status has changed.
lock.notifyAll();
}
finally
{
monitorexit(lock)
}
}

wait() 方法將通知當前線程進入非活動狀態,並等待另一個線對該對象調用 notifyAll().然後,通知的線程必須在能夠繼續執行的時候嘗試再次獲取監控程序.從本質上說,wait() 和 notify()/notifyAll() 允許一種簡單的信令機制,它允許 Drop 在 Producer 和 Consumer 線程之間進行協調,每個 put 都有相應的 take.

本文的 代碼下載 部分使用 Java5 併發性增強(Lock 和 Condition 介面以及 ReentrantLock 鎖定實現)提供 清單 2 的基於超時的版本,但基本代碼模式仍然相同.這就是問題所在:編寫清單 2 這樣的代碼的開發人員需要過度專註於線程和鎖定的細節以及低級實現代碼,以便讓它們能夠正確運行.此外,開發人員需要對每一行代碼刨根知底,以確定是否需要保護它們,因為過度同步與過少同步同樣有害.

現在,我們來看到 Scala 替代方案.

良好的 Scala 併發性 (v1)

開始應用 Scala 併發性的一種方法是將 Java 代碼直接轉換為 Scala,以便利用 Scala 的語法優勢來簡化代碼(至少能簡化一點):

清單 3. ProdConSample (Scala)

object ProdConSample
{
class Producer(drop : Drop)
extends Runnable
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);

override def run() : Unit =
{
importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}
}


class Consumer(drop : Drop)
extends Runnable
{
override def run() : Unit =
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}

class Drop
{
var message : String = ""
var empty : Boolean = true
var lock : AnyRef = new Object()

def put(x: String) : Unit =
lock.synchronized
{
// Wait until message has been retrieved
await (empty == true)
// Toggle status
empty = false
// Store message
message = x
// Notify consumer that status has changed
lock.notifyAll()
}

def take() : String =
lock.synchronized
{
// Wait until message is available.
await (empty == false)
// Toggle status
empty=true
// Notify producer that staus has changed
lock.notifyAll()
// Return the message
message
}

private def await(cond: => Boolean) =
while (!cond) { lock.wait() }
}

def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop();

// Spawn Producer
new Thread(new Producer(drop)).start();

// Spawn Consumer
new Thread(new Consumer(drop)).start();
}
}

Producer 和 Consumer 類幾乎與它們的 Java 同類相同,再一次擴展(實現)了 Runnable 介面並覆蓋了 run() 方法,並且 — 對於 Producer 的情況 — 分別使用了內置迭代方法來遍歷 importantInfo 數組的內容.(實際上,為了讓它更像 Scala,importantInfo 可能應該是一個 List 而不是 Array,但在第一次嘗試時,我希望儘可能保證它們與原始 Java 代碼一致.)

Drop 類同樣類似於它的 Java 版本.但 Scala 中有一些例外,「synchronized」 並不是關鍵字,它是針對 AnyRef 類定義的一個方法,即 Scala 「所有引用類型的根」.這意味著,要同步某個特定的對象,您只需要對該對象調用同步方法;在本例中,對 Drop 上的 lock 欄位中所保存的對象調用同步方法.

注意,我們在 await() 方法定義的 Drop 類中還利用了一種 Scala 機制:cond 參數是等待計算的代碼塊,而不是在傳遞給該方法之前進行計算.在 Scala 中,這被稱作 「call-by-name」;此處,它是一種實用的方法,可以捕獲需要在 Java 版本中表示兩次的條件等待邏輯(分別用於 put 和 take).

,在 main() 中,創建 Drop 實例,實例化兩個線程,使用 start() 啟動它們,然後在 main() 的結束部分退出,相信 JVM 會在 main() 結束之前啟動這兩個線程.(在生產代碼中,可能無法保證這種情況,但對於這樣的簡單的例子,99.99% 沒有問題.)

但是,已經說過,仍然存在相同的基本問題:程序員仍然需要過分擔心兩個線程之間的通信和協調問題.雖然一些 Scala 機制可以簡化語法,但這目前為止並沒有相當大的吸引力.

Scala 併發性 v2

Scala Library Reference 中有一個有趣的包:scala.concurrency.這個包包含許多不同的併發性結構,包括我們即將利用的 MailBox 類.

顧名思義,MailBox 從本質上說就是 Drop,用於在檢測之前保存數據塊的單槽緩衝區.但是,MailBox 最大的優勢在於它將發送和接收數據的細節完全封裝到模式匹配和 case 類中,這使它比簡單的 Drop(或 Drop 的多槽數據保存類 java.util.concurrent.BoundedBuffer)更加靈活.

清單 4. ProdConSample, v2 (Scala)

package com.tedneward.scalaexamples.scala.V2
{
import concurrent.{MailBox, ops}

object ProdConSample
{
class Producer(drop : Drop)
extends Runnable
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);

override def run() : Unit =
{
importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}
}

class Consumer(drop : Drop)
extends Runnable
{
override def run() : Unit =
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}

class Drop
{
private val m = new MailBox()

private case class Empty()
private case class Full(x : String)

m send Empty() // initialization

def put(msg : String) : Unit =
{
m receive
{
case Empty() =>
m send Full(msg)
}
}

def take() : String =
{
m receive
{
case Full(msg) =>
m send Empty(); msg
}
}
}

def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop()

// Spawn Producer
new Thread(new Producer(drop)).start();

// Spawn Consumer
new Thread(new Consumer(drop)).start();
}
}
}

此處,v2 和 v1 之間的惟一區別在於 Drop 的實現,它現在利用 MailBox 類處理傳入以及從 Drop 中刪除的消息的阻塞和信號事務.(我們可以重寫 Producer 和 Consumer,讓它們直接使用 MailBox,但考慮到簡單性,我們假定希望保持所有示例中的 Drop API 相一致.)使用 MailBox 與使用典型的 BoundedBuffer(Drop)稍有不同,因此我們來仔細看看其代碼.

MailBox 有兩個基本操作:send 和 receive.receiveWithin 方法僅僅是基於超時的 receive.MailBox 接收任何類型的消息.send() 方法將消息放置到郵箱中,並立即通知任何關心該類型消息的等待接收者,並將它附加到一個消息鏈表中以便稍後檢索.receive() 方法將阻塞,直到接收到對於功能塊合適的消息.

因此,在這種情況下,我們將創建兩個 case 類,一個不包含任何內容(Empty),這表示 MailBox 為空,另一個包含消息數據(Full.

put 方法,由於它會將數據放置在 Drop 中,對 MailBox 調用 receive() 以查找 Empty 實例,因此會阻塞直到發送 Empty.此時,它發送一個 Full 實例給包含新數據的 MailBox.

take 方法,由於它會從 Drop 中刪除數據,對 MailBox 調用 receive() 以查找 Full 實例,提取消息(再次得益於模式匹配從 case 類內部提取值並將它們綁到本地變數的能力)併發送一個 Empty 實例給 MailBox.不需要明確的鎖定,並且不需要考慮監控程序.

Scala 併發性 v3

事實上,我們可以顯著縮短代碼,只要 Producer 和 Consumer 不需要功能全面的類(此處便是如此) — 兩者從本質上說都是 Runnable.run() 方法的瘦包裝器,Scala 可以使用 scala.concurrent.ops 對象的 spawn 方法來實現,如清單 5 所示:

清單 5. ProdConSample, v3 (Scala)

package com.tedneward.scalaexamples.scala.V3
{
import concurrent.MailBox
import concurrent.ops._

object ProdConSample
{
class Drop
{
private val m = new MailBox()

private case class Empty()
private case class Full(x : String)

m send Empty() // initialization

def put(msg : String) : Unit =
{
m receive
{
case Empty() =>
m send Full(msg)
}
}

def take() : String =
{
m receive
{
case Full(msg) =>
m send Empty(); msg
}
}
}


def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop()

// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
);

importantInfo.foreach((msg) => drop.put(msg))
drop.put("DONE")
}

// Spawn Consumer
spawn
{
var message = drop.take()
while (message != "DONE")
{
System.out.format("MESSAGE RECEIVED: %s%n", message)
message = drop.take()
}
}
}
}
}

spawn 方法(通過包塊頂部的 ops 對象導入)接收一個代碼塊(另一個 by-name 參數示例)並將它包裝在匿名構造的線程對象的 run() 方法內部.事實上,並不難理解 spawn 的定義在 ops 類的內部是什麼樣的:

清單 6. scala.concurrent.ops.spawn()

def spawn(p: => Unit) = {
val t = new Thread() { override def run() = p }
t.start()
}

……這再一次強調了 by-name 參數的強大之處.

ops.spawn 方法的一個缺點在於,它是在 2003 年 Java 5 concurrency 類還不可用的時候編寫的.特別是,java.util.concurrent.Executor 及其同類的作用是讓開發人員更加輕鬆地生成線程,而不需要實際處理直接創建線程對象的細節.幸運的是,在您自己的自定義庫中重新創建 spawn 的定義是相當簡單的,這需要利用 Executor(或 ExecutorService 或 ScheduledExecutorService)來執行線程的實際啟動任務.

事實上,Scala 的併發性支持超越了 MailBox 和 ops 類;Scala 還支持一個類似的 「Actors」 概念,它使用了與 MailBox 所採用的方法相類似的消息傳遞方法,但應用更加全面並且靈活性也更好.但是,這部分內容將在下期討論.

結束語

Scala 為併發性提供了兩種級別的支持,這與其他與 Java 相關的主題極為類似:

,對底層庫的完全訪問(比如說 java.util.concurrent)以及對 「傳統」 Java 併發性語義的支持(比如說監控程序和 wait()/notifyAll()).

其次,這些基本機制上面有一個抽象層,詳見本文所討論的 MailBox 類以及將在本系列下一篇文章中討論的 Actors 庫.

兩個例子中的目標是相同的:讓開發人員能夠更加輕鬆地專註於問題的實質,而不用考慮併發編程的低級細節(顯然,第二種方法更好地實現了這一目標,至少對於沒有過多考慮低級細節的人來說是這樣的.)

但是,當前 Scala 庫的一個明顯的缺陷就是缺乏 Java 5 支持;scala.concurrent.ops 類應該具有 spawn 這樣的利用新的 Executor 介面的方法.它還應該支持利用新的 Lock 介面的各種版本的 synchronized.幸運的是,這些都是可以在 Scala 生命周期中實現的庫增強,而不會破壞已有代碼;它們甚至可以由 Scala 開發人員自己完成,而不需要等待 Scala 的核心開發團隊提供給他們(只需要花費少量時間).

描述 名字 大小 下載方法
本文的示例 Scala 代碼 j-scala02049.zip 10KB HTTP


[火星人 ] 面向Java開發人員的Scala指南: 深入了解Scala併發性 了解 Scala 如何簡化併發編程並繞過陷阱已經有894次圍觀

http://coctec.com/docs/java/show-post-61105.html