C++という言葉に惹かれてきた方へごめんなさい。今日はVisual C++ 2010附属のAsynchronous Agent Libraryです。これの紹介を書きたいなあと思いつつもいいサンプルが思い浮かばないと放置していたのですが、今は並列とか並行プログラミングとかがはやりなので、ウェブをあさればネタには困らないのでした。

というわけで、Go言語 (Go lang)の並列プログラミングは超かんたん。 – 医者を志す妻を応援する夫の日記の中から、Goなんて1ミリも知りませんが雰囲気でAsynchronous Agent Libraryを使って書き写してみました。

まずはHello, worldっぽいのです。

package main

var ch = make(chan string)

func g(str string) {
      println(str);
      ch <- "printed";
}

func main() {
      go g("hello, gorutine!");
      <- ch;
}

さっそくVC++で書き直します。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <agents.h>
#include <string>
#include <iostream>
#include <boost/thread.hpp>
 
Concurrency::unbounded_buffer<std::string> ch;
 
void g(std::string const& str)
{
    std::cout << str << std::endl;
    Concurrency::asend(ch, std::string("printed"));
}
 
int main()
{
    boost::thread t(g, "hello, Agent library!");
    Concurrency::receive(ch);
}

実行すると次のような出力が得られます。

hello, Agent library!

スレッド作成にBoostを使ってごめんなさい。std::threadを持っていないVC++ 2010がいけないのです。もっとも、Asynchronous Agent Libraryにはスレッド生成の仕組みとしてagentクラスがあります。ただ、これを使うには派生してクラスを作ることになり、行数が膨らむので使いませんでした。今回はスレッドを作る以上の機能は使いませんし。

チャネルというやつ(変数chのやつ)に相当するのはたぶんunbounded_bufferだろうと思いました。unbounded_bufferはキューです。値が来たら値を蓄え、取り出す要求が来たら値を放出します。asendは値を送り、receiveで値を受け取ります。もちろん、中はスレッド安全なので、ロックなどは考えなくて平気です。

次のコードへ行きましょう。素数をひたすら表示するというコードだそうです。

package main

// Send the sequence 2, 3, 4, ... to channel 'ch'.
func Generate(ch chan<- int) {
    for i := 2; ; i++ {
        ch <- i  // Send 'i' to channel 'ch'.
    }
}

// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func Filter(in <-chan int, out chan<- int, prime int) {
    for {
        i := <-in;  // Receive value of new variable 'i' from 'in'.
        if i % prime != 0 {
            out <- i  // Send 'i' to channel 'out'.
        }
    }
}

// The prime sieve: Daisy-chain Filter processes together.
func Sieve() {
    ch := make(chan int);  // Create a new channel.
    go Generate(ch);  // Start Generate() as a subprocess.
    for {
        prime := <-ch;
        print(prime, "\n");
        ch1 := make(chan int);
        go Filter(ch, ch1, prime);
        ch = ch1
    }
}

func main() {
    Sieve()
}

骨幹は最初の例と同じで、チャネルというものをunbounded_bufferに置き換えただけです。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <agents.h>
#include <string>
#include <iostream>
#include <memory>
#include <boost/thread.hpp>
 
using Concurrency::unbounded_buffer;
using Concurrency::ISource;
using Concurrency::ITarget;
using Concurrency::asend;
using Concurrency::receive;
using std::shared_ptr;
using std::make_shared;
using boost::thread;
 
// Send the sequence 2, 3, 4, ... to channel 'ch'.
void Generate(shared_ptr<ITarget<int>> ch)
{
    for (int i = 2; ; i++) {
        asend(*ch, i);  // Send 'i' to channel 'ch'.
    }
}
 
// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
void Filter(shared_ptr<ISource<int>> in,
	shared_ptr<ITarget<int>> out, int prime)
{
    for (;;) {
        int i = receive(*in);  // Receive value of new variable 'i' from 'in'.
        if (i % prime != 0) {
            asend(*out, i);  // Send 'i' to channel 'out'.
        }
    }
}
 
// The prime sieve: Daisy-chain Filter processes together.
void Sieve()
{
    auto ch = make_shared<unbounded_buffer<int>>();  // Create a new channel.
    new thread(Generate, ch);  // Start Generate() as a subprocess.
    for (;;) {
        int prime = receive(*ch);
        std::cout << prime << std::endl;
        auto ch1 = make_shared<unbounded_buffer<int>>();
        new thread(Filter, ch, ch1, prime);
        ch = std::move(ch1);
    }
}
 
int main()
{
    Sieve();
}

永遠に終わらないということで、threadはnewしたまま放置しています、ごめんなさい。あと、unbounded_bufferの基底クラスにISource/ITargetというのがあるので、Generate/Filter関数ではそっちを引数の型としました。出力はこんな感じです。

2
3
5
7
11
13
17
⋮

以下補足2点です。1点目、今回登場しませんでしたが、asend/receiveにはそれぞれ兄弟がいます。

  • send関数: asendと違い、誰かが値を取り出すまで待機する。
  • try_receive関数: receiveと違い、待機せず即座に戻る。値そのものは参照型引数に格納し、取得したかを表すbool値を返す。

2点目、send/asendとreceive/try_receiveとでやりとりできる対象はunbounded_buffer以外にいくつか種類があります。非同期メッセージのブロックにおそらくすべて載っています。気が向いたらそのうち取り上げるかもしれません。

スポンサード リンク

この記事のカテゴリ

  • ⇒ Asynchronous Agent Libraryを使ってみる