ILE RPG でマルチスレッド・アプリケーション (1) - 基本的な流れ -

V6R1 から ILE RPG でマルチスレッド・アプリケーションが作成できるということなのでちょっと試してみました。

インフォメーション・センターに載っている例を見てみると、要するに pthreads ライブラリを使ってマルチスレッドを実現するということがわかります。

まずは仕組みを理解するために、スレッドを 2つ走らせて、それぞれに計算をさせるという処理を書いてみました。

"処理"の中でいちばん単純なのは足し算かな、ということで、ただ足し算をループのたびに行うという処理を各スレッドに行わせています。
経過がわかるように、スレッドの生成/同期といったイベントや、それぞれの足し算の結果はそのたびに標準出力に出力しています。

     H THREAD(*CONCURRENT)                                                            
     H BNDDIR('QC2LE')                                                                              
     H DFTACTGRP(*NO)                                                                               
     H OPTION(*SRCSTMT : *NOUNREF)                                                                  
      /COPY QSYSINC/QRPGLESRC,PTHREAD                                                               
      *                                                                                             
     D NUMTHREADS      C                   10                                                       
      *                                                                                             
     D do_add1         PR            10i 0 extproc('do_add1')                                       
     D    rc1                          *   value                                                    
      *                                                                                             
     D do_add2         PR            10i 0 extproc('do_add2')                                       
     D    rc2                          *   value                                                    
      *                                                                                             
     D rc1             S             10i 0 inz(100)                                                 
     D rc2             S             10i 0 inz(10)                                                  
      *                                                                                             
     D checkResults    PR                  extproc('checkResults')                                  
     D    string                   1000a   varying const                                            
     D    val                        10i 0 value                                                    
      *                                                                                             
     D print           PR                  extproc('print')                                         
     D    msg                      1000a   varying const                                            
      *                                                                                             
     D fmtThreadId     PR            17a   varying                                                  
      *                                                                                             
     D CEETREC         PR                                                                           
     D    cel_rc_mod                 10i 0 OPTIONS(*OMIT)                                           
     D    user_rc                    10i 0 OPTIONS(*OMIT)                                           
      *                                                                                             
     D sleep           PR                  extproc(*CWIDEN : 'sleep')                               
     D    secs                       10i 0 value                                                    
      *                                                                                             
     D thread          DS                  likeds(pthread_t)                                        
     D                                     dim(NUMTHREADS)                                          
      *                                                                                             
     D rc              S             10i 0 inz(0)                                                   
      *                                                                                             
      /free                                                                                         
                                                                                                    
          print ('Create/start a thread') ;                                                         
          rc = pthread_create(thread(1) : *OMIT                                                     
                                  :%paddr(do_add1) : %addr(rc1)) ;                                  
          checkResults('pthread_create() ' : rc) ;                                                  
                                                                                                    
          print ('Create/start another thread') ;                                                   
          rc = pthread_create(thread(2) : *OMIT                                                     
                             :%paddr(do_add2) : %addr(rc2)) ;                                       
          checkResults('pthread_create() ' : rc) ;                                                  
                                                                                                    
          sleep(10) ;                                                                               
                                                                                                    
          print ('Sync threads') ;                                                                  
          rc = pthread_join  (thread(1) : *OMIT) ;                                                  
          checkResults('pthread_join() ' : rc) ;                                                    
                                                                                                    
          print ('Sync another thread') ;                                                           
          rc = pthread_join  (thread(2) : *OMIT) ;                                                  
          checkResults('pthread_join() ' : rc) ;                                                    
                                                                                                    
          print ('do_add1 returns ' + %char(rc1)) ;                                                 
          print ('do_add2 returns ' + %char(rc2)) ;                                                 
                                                                                                    
          return ;                                                                                  
                                                                                                    
      /end-free                                                                                     
      *                                                                                             
     P do_add1         B                                                                            
     D do_add1         PI            10i 0                                                          
     D    parm                         *   value                                                    
     D r1              S             10i 0 based(parm)                                              
     D i               S             10i 0                                                          
      /free                                                                                         
          print ('Enter do_add1 ') ;                                                                
          for i = 1 to 30 ;                                                                         
             sleep(1) ;                                                                             
             r1  += 1 ;                                                                             
             print ('do_add1 : ' + %char(r1 )) ;                                                    
          endfor ;                                                                                  
          rc1 = r1 ;                                                                                
          return 0 ;                                                                                
      /end-free                                                                                     
     P do_add1         E                                                                            
      *                                                                                             
     P do_add2         B                                                                            
     D do_add2         PI            10i 0                                                          
     D    parm                         *   value                                                    
     D r2              S             10i 0 based(parm)                                              
     D i               S             10i 0                                                          
      /free                                                                                         
          print ('Enter do_add2 ') ;                                                                
          for i = 1 to 50 ;                                                                         
             sleep(1) ;                                                                             
             r2   += 1 ;                                                                            
             print ('do_add2 : ' + %char(r2 )) ;                                                    
          endfor ;                                                                                  
          rc2 = r2 ;                                                                                
          return 0 ;                                                                                
      /end-free                                                                                     
     P do_add2         E                                                                            
      *                                                                                             
      * Utility Procedures                                                                          
      *                                                                                             
     P checkResults    B                   EXPORT                                                   
     D checkResults    PI                                                                           
     D    string                   1000a   varying const                                            
     D    val                        10i 0 value                                                    
      /free                                                                                         
          if val <> 0 ;                                                                             
             print ('Thread(' + fmtThreadId() + ') : ' + string) ;                                  
             CEETREC (*OMIT : *OMIT) ;                                                              
          else ;                                                                                    
             /if defined(LOG_ALL_RESULTS)                                                           
             print (string + ' completed normally with ' + %char(val)) ;                            
             /endif                                                                                 
          endif ;                                                                                   
      /end-free                                                                                     
     P checkResults    E                                                                            
      *                                                                                             
     P print           B                   EXPORT                                                   
     D print           PI                                                                           
     D    msg                      1000a   varying const                                            
     D printf          PR              *   extproc('printf')                                        
     D    template                     *   value options(*string)                                   
     D    string                       *   value options(*string)                                   
     D    dummy                        *   value options(*nopass)                                   
     D NEWLINE         C                   x'15'                                                    
      /free                                                                                         
          printf ('%s' + NEWLINE : msg) ;                                                           
      /end-free                                                                                     
     P print           E                                                                            
      *                                                                                             
     P fmtThreadId     B                   EXPORT                                                   
     D fmtThreadId     PI            17a   varying                                                  
     D pthreadId       DS                  likeds(pthread_id_np_t)                                  
     D buf             S           1000a                                                            
     D sprintf         PR              *   extproc('sprintf')                                       
     D    buf                          *   value                                                    
     D    template                     *   value options(*string)                                   
     D    num1                       10u 0 value                                                    
     D    num2                       10u 0 value                                                    
     D    dummy                        *   value options(*nopass)                                   
      /free                                                                                         
          pthreadId = pthread_getthreadid_np() ;                                                    
          sprintf (%addr(buf)                                                                       
                  : '%.8x % 8x'                                                                     
                  : pthreadId.intId.hi                                                              
                  : pthreadId.intId.lo ) ;                                                          
          return  %str(%addr(buf)) ;                                                                
      /end-free                                                                                     
     P fmtThreadId     E                                                                            

Pthreads プログラミング」に載っている一番最初のカンタンな例を参考にしています。
インフォメーション・センターの例の中のユーティリティ関数も便利そうなものはそのまま使っています。

マルチスレッド・プログラムの基本形

pthreads 規格のスレッドは pthread_create で生成します。
必要な主な引数は、スレッド(pthread_t のデータ・ストラクチャ)、呼び出すプロシージャの関数ポインタ、そのプロシージャへのパラメータのアドレスといったものになります。
関数(プロシージャ)をスレッドとして実体化したもの、と考えてもいいでしょう。

          rc = pthread_create(thread(1) : *OMIT                                                     
                                  :%paddr(do_add1) : %addr(rc1)) ;                                  

生成するスレッドに割り当てるプロシージャは、パラメータをひとつだけ取るものに限定されます。また、そのパラメータはポインタ型で値渡しである必要があります。

     D do_add1         PR            10i 0 extproc('do_add1')                                       
     D    rc1                          *   value                                                    

ひとつだけしかパラメータが指定できない、というのは一見ちょっと困った制限のように見えます。
でも、RPG にはデータ・ストラクチャというものがありますよね? そこにまとめて、データ・ストラクチャをパラメータとしてわたせば OK です。(実際 C の場合は構造体を使用します)

ポインタ型のパラメータといっても、そんなに恐れることはありません。ただ単に先頭のアドレスが渡されてくるだけです。

受けとった側の変数定義で、↓のようにパラメータとして引き渡されたポインタを基底にする変数を定義しておきましょう。
そうすれば、そのままその変数をパラメータの値として使用することができます。

     P do_add1         B                                                                            
     D do_add1         PI            10i 0                                                          
     D    parm                         *   value                                                    
     D r1              S             10i 0 based(parm)                                              

いくつかの処理をスレッドとして並行して複数実行させるわけですが、それぞれが終わったことを確認して結果をまとめたい場合がありますね。
そういう場合には pthread_join で同期を取ることができます。

          rc = pthread_join  (thread(1) : *OMIT) ;                                                  

プロシージャの処理は独立させておくこと

呼び出して処理させるプロシージャは同時に実行されてもいいようにしておくことが大事です。

i をずっとやってきた人であれば、処理の終了を早めたくて、バッチプログラムを分割してサブシステム内で同時に走らせるようにした経験があったりしませんか?
そういった設計と同じことです。i の場合、それぞれの処理を連携させるためにデータエリアとかデータ待ち行列とかを利用することがありますね。それも同様のものが pthreads ライブラリにはあります。

相互に干渉しないよう、また適切な時にだけ値のやりとりができるよう、きちんと設計を考える必要があります。

けっこう i の経験というのは活かせるものなんです。
マルチスレッドといっても、並行処理という考え方は昔からあるもので、夜バッチを分割して並行に走らせる、といったこととそれほど変わりません。
ここでの紹介も主に pthreads の"お作法"が中心になります。

スレッドを使用する基本の流れをあらためてまとめると

まとめると、スレッドとして独立/並行に行わせたい処理を、パラメータひとつをポインタ渡しする形のプロシージャとして定義し、

     D do_add1         PR            10i 0 extproc('do_add1')                                       
     D    rc1                          *   value                                                    

スレッドをそのプロシージャとパラメータを指定して生成し(この時点でプロシージャ内の処理が始まります)、

          rc = pthread_create(thread(1) : *OMIT                                                     
                                  :%paddr(do_add1) : %addr(rc1)) ;                                  
     P do_add1         B                                                                            
     D do_add1         PI            10i 0                                                          
     D    parm                         *   value                                                    
     D r1              S             10i 0 based(parm)                                              

最後にスレッドの同期を取る、といった流れですね。

          rc = pthread_join  (thread(1) : *OMIT) ;                                                  

コンパイル

コンパイルには特別なオプションは必要ありません。

元のインフォメーション・センターの例にもあった、より詳細なログを出すためのオプションを利用しています。

実行

SBMJOB コマンドで実行させますが、

マルチスレッド使用可能(ALWMLTTHD)に *YES を指定しての実行が必要です。

実行例

実行しているジョブの「スレッドの処理(WRKJOB 後の画面から 20 / WRKACTJOB で該当ジョブを選択してのオプション 12)」を見てみると、以下のように複数のスレッドが生成されていることを確認できます。

各スレッドのスタックを見てみましょう。
下の 2つのスレッドをそれぞれ見てみると、pthread_create() で生成されたスレッドであることがわかります。
ちなみに、こんなものがリアルタイムでシステムに何の設定もせず負荷もかけずに見れる、というのは IBM i の凄いところでもあります。

実行結果はスプール(SBMJOB で実行した際の標準出力)に出力されます。

スレッドが生成されるとすぐにプロシージャ内の処理が実行されていること、
また、各スレッドのメッセージが入り乱れている = それぞれ独立に実行されていることなどがわかりますね。(最初のうちは交互に実行されていますが、後の方ではもっとランダムになっています)

1つめのスレッドに pthread_join() で同期が要請されていますが、このスレッドはまだ実行中なのでこの要請は待たされたままになります。
最初にスレッドを生成したメインのプロシージャも勝手に進んでしまいますし、生成された方のスレッドも勝手に進んでしまいます。当然それぞれの進み方が全然違ってきてしまうわけです。人間は並行に動いているものを同時に考えるのはあまり得意ではないものなので、こんなところは気をつける必要がありそうです。

スレッドの実行が終了した(do_add1 の結果が 130 になっていることからわかります)ところで pthread_join() が完了していることがわかります。

最後に全スレッド(といっても 2つですが)を同期させて(というより同期されたところで)最終計算結果を出力して終了です。

[Top Pageに戻る]

Ads by TOK2