13.3 他のシステムとHARKを接続したい

Problem

HARK にて,音源定位や音源分離ができるのがわかったが, その情報を他のシステムに使いたいがどうすればいいか.

この質問に対しては,

を別々に説明する.

Solution 1

: HARK で作った異なるシステム同士の接続

HARK において一度作ったシステムを一つのサブモジュールとして, 複数のサブモジュールの集合を一つのネットワークとして処理したい場合が多くある. 例えば,同じアルゴリズムを並列処理させる場合,全ての並列モジュール群を一つの subnet シートに記述するのは膨大な数のノードになってわかりにくいし,手間がかかる.

本稿では,subnet シートに書かれている一つのシステムを一つのノードとして扱う方法を紹介する.

この大きなシステムのサブモジュール化により, 同じブロック構成を作る手間が省けるし, 大規模なネットワークファイルが見やすくなる.

この機能の使用例として以下のような場合が考えられる.

 
 
(注1) エクスポート・インポートはわざわざしなくても,エクスポートすべき subnet シートの ノードをインポートする側にコピー・ペーストするだけでもいい. しかし,大きな subnet シートのノードを全てコピー・ペーストするのは手間がかかるため, エクスポートを行うよう説明した.  
 
(注2) 以下にサブモジュールの改変例について示す.

Solution 2

: ソケット通信を用いた他のシステムとの接続

本章では,ソケット通信を用いた HARK 以外のシステムとの接続について説明する. 具体的に,現在の HARK は,ソケット通信を用いて,音声認識エンジンである Julius との接続を実現している. 通信を担うノードとして,HARK には以下のものが標準で入っている.

いずれのクライアントも,ソケットを介して, 音声の特徴量ベクトルなどの情報を メッセージ送信しており, あとは Julius 側でその処理を行うようになっている.

詳しくは,上記2つのソースを参照されたい. ただし,上記のノードは,一つの音声区間をまとめて送信するなど, 多少複雑なことをやっているので,次節では, 簡単にクライアントとしての HARK ノードの作り方と, サーバとしての HARK ノードの作り方を見ていく.

Solution 2-1 : ソケット通信を用いた他のシステムとの接続(クライアント編)

Julius との通信の場合,Julius はサーバプログラムとして, 常に HARK からのメッセージを listen している. この例を簡単なサーバプログラムと HARK ノードを作ることで構築してみる.

まずはサーバプログラム listener.c を作る. (これが Julius 側の他のシステムに対応する.)

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

main(int argc, char *argv[])
{
    int    i;
    int    fd1, fd2;
    struct sockaddr_in    saddr;
    struct sockaddr_in    caddr;
    int    len;
    int    ret;
    char   buf[1024];

    if (argc != 2){
	printf("Usage: listener PORT_NUMBER\n");
	exit(1);
    }

    if ((fd1 = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
	perror("socket");
	exit(1);
    }

    bzero((char *)&saddr, sizeof(saddr));
    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = INADDR_ANY;
    saddr.sin_port = htons(atoi(argv[1]));

    if (bind(fd1, (struct sockaddr *)&saddr, sizeof(saddr)) < 0){
	perror("bind");
	exit(1);
    }
    
    if (listen(fd1, 1) < 0) {
	perror("listen");
	exit(1);
    }

    len = sizeof(caddr);

    if ((fd2 = accept(fd1, (struct sockaddr *)&caddr, &len)) < 0) {
	perror("accept");
	exit(1);
    }
    close(fd1);

    ret = read(fd2, buf, 1024);

    while (strcmp(buf, "quit\n") != 0) {
      printf("Received Message : %s", buf);
      ret = read(fd2, buf, 1024);
      write(fd2, buf, 1024); // This returns a responce.
    }

    close(fd2);
}

中身は普通のソケット通信のプログラムである. このプログラムによって,fd2 に書き込まれたメッセージを printf 表示する. ソースがカット&ペーストできたら,コンパイルをしておく.

> gcc -o listener listener.c

次に,HARK 側のクライアントノードを作成する. 以下のソースをカット&ペーストし, TalkerTutorial.cc を作成する.

#include <iostream>
#include <BufferedNode.h>
#include <Buffer.h>
#include <string.h>
#include <sstream>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <csignal>

using namespace std;
using namespace FD;

class TalkerTutorial;

DECLARE_NODE(TalkerTutorial);
/*Node
 *
 * @name TalkerTutorial
 * @category HARKD:Tutorial
 * @description This block outputs the same integer as PARAM1 and sends it through socket.
 *
 * @output_name OUTPUT1
 * @output_type int
 * @output_description This output the same integer as PARAM1.
 * 
 * @parameter_name PARAM1
 * @parameter_type int
 * @parameter_value 123
 * @parameter_description Setting for OUTPUT1
 * 
 * @parameter_name PORT
 * @parameter_type int
 * @parameter_value 8765
 * @parameter_description Port number for socket connection
 * 
 * @parameter_name IP_ADDR
 * @parameter_type string
 * @parameter_value 127.0.0.1
 * @parameter_description IP address for socket connection
 *
END*/

bool exit_flag2 = false;

class TalkerTutorial : public BufferedNode {
  int output1ID;
  int param1;
  int port;  
  string ip_addr;
  struct sockaddr_in    addr;
  struct hostent *hp;
  int    fd;
  int    ret;

public:
  TalkerTutorial(string nodeName, ParameterSet params)
    : BufferedNode(nodeName, params)
  {
    output1ID    = addOutput("OUTPUT1");
    param1  = dereference_cast<int>(parameters.get("PARAM1"));
    port    = dereference_cast<int>(parameters.get("PORT"));
    ip_addr = object_cast<String>(parameters.get("IP_ADDR"));

    signal(SIGINT,  signal_handler);
    signal(SIGHUP,  signal_handler);
    signal(SIGPIPE, signal_handler);

    if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket");
        exit(1);
    }

    bzero((char *)&addr, sizeof(addr));
    if ((hp = gethostbyname(ip_addr.c_str())) == NULL) {
	perror("No such host");
	exit(1);
    }
    bcopy(hp->h_addr, &addr.sin_addr, hp->h_length);
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);

    if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0){
        perror("connect");
        exit(1);
    }

    inOrder = true;
  }

  void calculate(int output_id, int count, Buffer &out)
  {
    
    // Main loop routine starts here.

    ostringstream message;
    message << "[" << count << " , " << param1 << "]" << endl;
    string buf = message.str();  

    write(fd, buf.c_str(), 1024);
    cout << "Sent Message : [" << count << " , " << param1 << "]" << endl;

    (*(outputs[output1ID].buffer))[count] = ObjectRef(Int::alloc(param1));

    if(exit_flag2){
      cerr << "Operation closed..." << endl;
      close(fd);
      exit(1);
    }

    // Main loop routine ends here.

  }

  static void signal_handler(int s)
  {
    exit_flag2 = true;
  }

};

こちらも中身は単純なソケット通信のクライアントノードになっているのみである. count ループ毎に message に格納された文字列をソケット通信している. カット&ペーストが終わったら,ソースコンパイルからインストールをする.

クライアントとサーバが揃ったところで,Flowdesigner のノードを構築してみる. Sleep で特定の時間周期毎にソケット通信を行うような下記のような簡単なノードである.

\includegraphics[width=60mm]{fig/recipes/Advanced_MAIN}
(a) MAIN(subnet) シート
\includegraphics[width=60mm]{fig/recipes/Advanced_TalkerPart}
(b) LOOP0(iterator) シート
Figure 13.9: ネットワークファイル : TalkerTutorial

実行するためには, まずはサーバプログラムから起動する. 自分で適当なポート番号を決め(ここでは 8765 とする) コマンドライン上で次のようにする.

> ./listener 8765

次に,Flowdesigner のネットワークファイルの設定に移る. 新しいコンソールを起動し,Flowdesigner で先ほど作ったネットワークファイルを起動.

Sleep ノード左クリック > SECONDS を float 型で適当な周期に(ここでは 10000)
TalkerTutorial ノードを左クリック > PARAM1 を int 型で適当な値に
TalkerTutorial ノードを左クリック > PORT を int 型で決めたポート番号に(ここでは 8765)
TalkerTutorial ノードを左クリック > IP_ADDR を string 型で IP アドレスまたはホスト名に(ここでは 127.0.0.1)

IP アドレスの設定は同じマシンでサーバとクライアントが動いていることを仮定した. 離れたマシンで通信を行うことももちろん可能である.

「実行」ボタンクリック

  すると,動いているサーバに HARK からのメッセージが届き, それぞれのコンソールに以下のように表示される.

サーバ側( listener )

>eceived Message : [0 , 123]
Received Message : [1 , 123]
Received Message : [2 , 123]
...

クライアント側( Flowdesigner : TalkerTutorial )

>ent Message : [0 , 123]
Sent Message : [1 , 123]
Sent Message : [2 , 123]
...

このように,ソケット通信を用いて, 他のシステムに応答を与えることが可能である.

Solution 2-1 : ソケット通信を用いた他のシステムとの接続(サーバ編)

次に,あるクライアントプログラムから,ソケット通信で データを受け取れるようなサーバノードを作成することを考える.

先ほどと同様に,簡単なクライアントプログラムを用意した後, サーバノードを作ってみる.

まずはサーバプログラム Talker.c を作る. 簡単なソケット通信のクライアントプログラムである.

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>

main(int argc, char *argv[])
{
    struct sockaddr_in    addr;
    struct hostent *hp;
    int    fd;
    int    len;
    int    port;
    char   buf[1024];
    // int    ret;

    if (argc != 3){
	printf("Usage: talker SERVER_NAME PORT_NUMBER\n");
	exit(1);
    }

    if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket");
        exit(1);
    }

    bzero((char *)&addr, sizeof(addr));

    if ((hp = gethostbyname(argv[1])) == NULL) {
	perror("No such host");
	exit(1);
    }
    bcopy(hp->h_addr, &addr.sin_addr, hp->h_length);
    addr.sin_family = AF_INET;
    addr.sin_port = htons(atoi(argv[2]));

    if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0){
        perror("connect");
        exit(1);
    }

    while (fgets(buf, 1024, stdin)) {
        write(fd, buf, 1024);
        // ret = read(fd, buf, 1024); // This listens a responce. 
	// buf[ret] = '\0';
        printf("Sent Message : %s",buf);
    }
    close(fd);
    exit(0);
}

プログラムから見ても明らかなように, コンソール上で,改行までの文字列を読み込み, その文字列を fd というディスクリプタで表されるソケットに流している.

ファイルができたら,以下でコンパイル.

> gcc -o talker talker.c

ここから HARK でサーバノードを構築する. 重要なことは,クライアントからメッセージが送られてくるタイミングは未知なので, 一回メッセージを受信するごとに一連の処理をするようにサーバノードを作成しなければならないことである. そこで,今までは Sleep を条件としたループ演算を行ってきたが, サーバー自身がループ処理のトリガとなるようにノードを作る. 以下がその例である.

#include <iostream>
#include <BufferedNode.h>
#include <Buffer.h>
#include <string.h>
#include <sstream>
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <csignal>

using namespace std;
using namespace FD;

class ListenerTutorial;

DECLARE_NODE(ListenerTutorial);
/*Node
 *
 * @name ListenerTutorial
 * @category HARKD:Tutorial
 * @description This block listens to messages from socket and outputs it.
 *
 * @output_name OUTPUT1
 * @output_type string
 * @output_description Same as the message received from socket.
 *
 * @output_name CONDITION
 * @output_type bool
 * @output_description True if we haven't reach the end of file yet.
 * 
 * @parameter_name PORT
 * @parameter_type int
 * @parameter_value 8765
 * @parameter_description Port number for socket connection
 *
END*/

bool exit_flag = false;

class ListenerTutorial : public BufferedNode {
  int output1ID;
  int conditionID;
  int port;  
  
  int    fd1, fd2;
  struct sockaddr_in    saddr;
  struct sockaddr_in    caddr;
  int    len;
  int    ret;
  char   buf[1024];
  
public:
  ListenerTutorial(string nodeName, ParameterSet params)
    : BufferedNode(nodeName, params)
  {
    output1ID    = addOutput("OUTPUT1");
    conditionID  = addOutput("CONDITION");
    port    = dereference_cast<int>(parameters.get("PORT"));

    signal(SIGINT, signal_handler);
    signal(SIGHUP, signal_handler);
    signal(SIGPIPE, signal_handler);

    if ((fd1 = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
	perror("socket");
	exit(1);
    }

    bzero((char *)&saddr, sizeof(saddr));

    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = INADDR_ANY;
    saddr.sin_port = htons(port);

    if (bind(fd1, (struct sockaddr *)&saddr, sizeof(saddr)) < 0){
	perror("bind");
	exit(1);
    }

    if (listen(fd1, 1) < 0) {
	perror("listen");
	exit(1);
    }

    len = sizeof(caddr);

    if ((fd2 = accept(fd1, (struct sockaddr *)&caddr, (socklen_t *)&len)) < 0) {
	perror("accept");
	exit(1);
    }
    close(fd1);

    inOrder = true;
  }
  void calculate(int output_id, int count, Buffer &out)
  {
    
    // Main loop routine starts here.

    Buffer &conditionBuffer = *(outputs[conditionID].buffer);
    conditionBuffer[count] = (exit_flag ? FalseObject : TrueObject);

    ret = read(fd2, buf, 1024);
    cout << "Count : " << count << " , Received Message : " << buf << endl;

    ostringstream message;
    message << buf << endl;
    string output1 = message.str();  

    (*(outputs[output1ID].buffer))[count] = ObjectRef(new String(output1));

    write(fd2, buf, 1024);

    if(exit_flag){
      cerr << "Operation closed..." << endl;
      close(fd2);
      exit(1);
    }

    // Main loop routine ends here.

  }

  static void signal_handler(int s)
  {
    exit_flag = true;
  }

};

ここで,CONDITION という新たな bool 変数を出力するポートが加わったことが, 通常のノードとは異なる. この CONDITION ポートは,強制終了があったり,ソケット通信が中断されたりする時以外は 常に true を返す出力となっている. このポートを Flowdesigner のネットワークファイルのループのトリガにする.

実際にネットワークファイルを構築してみる. 上記のソースをカット&ペーストしたら, コンパイル&インストール. Flowdesigner を起動. 以下のノードを作成する.

\includegraphics[width=60mm]{fig/recipes/Advanced_MAIN}
(a) MAIN(subnet) シート
\includegraphics[width=60mm]{fig/recipes/Advanced_ListenerPart}
(b) LOOP0(iterator) シート
Figure 13.10: ネットワークファイル : ListenerTutorial

この中で,CONDITION 出力ポートが ネットワークファイルのループの CONDITION になっていることに着目されたい. つまり,このネットワークファイルのループ周期は, ListenerTutorial の処理周期と同じになるということである.

ListenerTutorial ノードは,calculate 関数内の

ret = read(fd2, buf, 1024);

で,メッセージを受信するまで,処理をサスペンドしている. メッセージを受信すると,calculate 関数の全てが処理され,一つの count の処理が終了する.

ネットワークファイルの CONDITIONListenerTutorial ノードの CONDITION ポートに したことで,メッセージの一回の受信に合わせて一連の処理を行うことができるようになる. これで,サーバノードはメッセージ受信のイベントに合わせて対応することができ, 満たすべき仕様を実現することができた.

実際に動かしてみよう. まずはサーバプログラム(先ほど作った Flowdesigner のネットワークファイル)から起動する.

ListenerTutorial ノードを左クリック > PORT を int 型で決めたポート番号に(ここでは 8765)
「実行」ボタンをクリック

CONDITION の設定を忘れないように.

次にクライアントプログラムを起動する. 新しいコンソールを起動し,先ほどコンパイルした talker.c のあるディレクトリへ移動. コマンドライン上で次のようにする.

> ./talker 127.0.0.1 8765

ここで,IP アドレスの設定は同じマシンでサーバとクライアントが動いていることを仮定し, 127.0.0.1 とした.離れたマシンで通信を行うことももちろん可能である.

talker の方のコンソールに適当な文字を入力してみよう.

すると,動いている Flowdesigner のサーバノードにクライアントコンソールからの メッセージが届き,それぞれのコンソールに以下のように表示される. (例えば,“hoge1”,“hoge2”,“hoge3”と入力した場合)

サーバ側( talker.c )

>oge1
Sent Message : hoge1
hoge2
Sent Message : hoge2
hoge3
Sent Message : hoge3
...

クライアント側( Flowdesigner : ListenerTutorial )

>ount : 0 , Received Message : hoge1
Count : 1 , Received Message : hoge2
Count : 2 , Received Message : hoge3
...

このように,HARK のノード自身がサーバとなって, 他のシステムからのメッセージを受信することも可能である.

Solution 3

: ROS(Robot Operating System)との接続

Willow Garage 社が開発しているオープンソースの ロボット用プラットフォーム ROSを 使ってシステムを作成している場合, HARK  から得た音源定位や分離結果を本節に述べる方法で ROS に渡すことができる.

ROS は web 上のドキュメントが充実しているので, (1) ROS システムは既にインストール済み, (2) topic への publish,topic からの subscribe 等の基本的な概念は既知とする. 具体的には, ROS tutorial の Beginner Level がすべて終わっていると仮定する.

接続方法には 2 種類あるので,それぞれについて解説する. また,ROS ノードの実装は Python を用いる.

(1) 標準出力を使う

HARK のネットワークは単体で実行可能であることを利用して, ノード中でサブプロセスとして HARK を実行する方法である. その上で, HARK から定位結果などを標準出力として出力させる (定位結果がほしいなら, LocalizeMUSIC の DEBUG プロパティをオンにすればよい).

例えば以下のコードでネットワーク /home/hark/localize.n を python スクリプト中から実行できる.

import subprocess
p = subprocess.Popen( "/home/hark/localize.n",
                      cwd = "/home/hark/",
                      stderr = subprocess.STDOUT,
                      stdout = subprocess.PIPE)

続いて,以下のコードでネットワークの出力を python スクリプト中で処理できる.

while not rospy.is_shutdown():
    line = p.stdout.readline()
    line から情報を得る

こうして HARK から得た情報を使って 適切なトピックへ publish すればよい.

(2) ソケット通信を使う

もう一つの方法は,ROS と HARK をソケット通信で接続することである. ROS 側にソケット通信のコードを, HARK 側にソケット通信のノードを作成する必要があるので手間は大きいが, 全体の構成はクリアになる. HARK のノードを作成するのは hark-document のチュートリアルを参照されたい.

ここでは,情報を受信する python スクリプトの断片を示す.

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind( ('localhost', 12345) )
sock.listen(1)
client, addr = sock.accept()

while 1:
    received = client.recv( 1024 )

(3) トピックを介して通信

最後は, ROS システムを活用する通信方法である. ROS のトピックに対してメッセージを publish するノードを作成すれば, 定位結果や分離音を直接 ROS に送信する事が可能である.

See Also

  1. ROS

  2. hark-document チュートリアル3 (ノード作成)