ILE RPG でマルチスレッド・アプリケーション (3) - 状態変数によるスレッド間の連携 -

スレッド同士で連携したい、というケースがあります。

たとえば、スレッドで処理した結果がある条件を満たしたので、別のスレッドにそれを知らせたい、といった処理が考えられますね。
何かの下限を下回ったので追加の処理を行いたい、とか、逆に必要な分が蓄積されたので次の処理に行きたい、とかいったようなことです。

状態変数

そういったことを実現するために、pthreads ライブラリの中に"状態変数"という仕組みがあります。

Pthreads プログラミング」の例 3-7 に載っている状態変数のシンプルなプログラミング例をほぼそのまま RPG でコーディングしてみました。

共用変数を mutex でロックしてカウントアップを行い、カウントがある値になったところで、そのイベントを待っている別のスレッドに知らせる、といった処理になっています。

     H THREAD(*CONCURRENT)                                                                          
     H BNDDIR('QC2LE')                                                                              
     H DFTACTGRP(*NO)                                                                               
     H OPTION(*SRCSTMT : *NOUNREF)                                                                  
      /COPY PTHLIB/QRPGLESRC,PTHREAD                                                                
      *                                                                                             
     D NUM_THREADS     C                   3                                                        
     D TCOUNT          C                   10                                                       
     D COUNT_THRES     C                   12                                                       
      *                                                                                             
     D inc_count       PR                  extproc('inc_count')                                     
     D    tid                          *   value                                                    
      *                                                                                             
     D watch_count     PR                  extproc('watch_count')                                   
     D    tid                          *   value                                                    
      *                                                                                             
     D tid             S             10i 0                                                          
     D                                     dim(NUM_THREADS)                                         
      *                                                                                             
     D checkResults    PR                  extproc('checkResults')                                  
     D    string                   1000a   varying const                                            
     D    val                        10i 0 value                                                    
      *                                                                                             
     D print           PR                  extproc('print')                                         
     D    msg                      1000a   varying const                                            
      *                                                                                             
     D fmtThreadId     PR            17a   varying                                                  
      *                                                                                             
     D CEETREC         PR                                                                           
     D    cel_rc_mod                 10i 0 OPTIONS(*OMIT)                                           
     D    user_rc                    10i 0 OPTIONS(*OMIT)                                           
      *                                                                                             
     D sleep           PR                  extproc(*CWIDEN : 'sleep')                               
     D    secs                       10i 0 value                                                    
      *                                                                                             
     D thread          DS                  likeds(pthread_t)                                        
     D                                     dim(NUM_THREADS)                                         
      *                                                                                             
     D rc              S             10i 0 inz(0)                                                   
      *                                                                                             
     D i               S             10i 0 inz(0)                                                   
      *                                                                                             
     D mutex           DS                  likeds(pthread_mutex_t)                                  
     D                                     STATIC(*allthread)                                       
      *                                                                                             
     D cond            DS                  likeds(pthread_cond_t)                                   
     D                                     STATIC(*allthread)                                       
      *                                                                                             
     D count           S             10i 0 inz(0)                                                   
     D                                     STATIC(*allthread)                                       
      *                                                                                             
      /free                                                                                         
                                                                                                    
          tid(1) = 1 ;                                                                              
          tid(2) = 2 ;                                                                              
          tid(3) = 3 ;                                                                              
                                                                                                    
          print ('Initialize mutex') ;                                                              
          mutex = PTHREAD_MUTEX_INITIALIZER ;                                                       
                                                                                                    
          print ('Initialize condition') ;                                                          
          cond  = PTHREAD_COND_INITIALIZER ;                                                        
                                                                                                    
          print ('Create/start a thread') ;                                                         
          rc = pthread_create(thread(1) : *OMIT                                                     
                             :%paddr(inc_count) : %addr(tid(1))) ;                                  
          checkResults('pthread_create() ' : rc) ;                                                  
                                                                                                    
          print ('Create/start another thread') ;                                                   
          rc = pthread_create(thread(2) : *OMIT                                                     
                             :%paddr(inc_count) : %addr(tid(2))) ;                                  
          checkResults('pthread_create() ' : rc) ;                                                  
                                                                                                    
          print ('Create/start waiter thread') ;                                                    
          rc = pthread_create(thread(3) : *OMIT                                                     
                             :%paddr(watch_count) : %addr(tid(3))) ;                                
          checkResults('pthread_create() ' : rc) ;                                                  
                                                                                                    
          sleep(10) ;                                                                               
                                                                                                    
          for i = 1 to NUM_THREADS ;                                                                
             print ('Sync threads') ;                                                               
             rc = pthread_join(thread(i) : *OMIT) ;                                                 
             checkResults('pthread_join() ' : rc) ;                                                 
          endfor ;                                                                                  
                                                                                                    
          return ;                                                                                  
                                                                                                    
      /end-free                                                                                     
      *                                                                                             
     P inc_count       B                                                                            
     D inc_count       PI                                                                           
     D    parm                         *   value                                                    
     D tid             S             10i 0 based(parm)                                              
     D i               S             10i 0                                                          
      /free                                                                                         
          print ('Enter inc_count. tid is ' + %char(tid)) ;                                         
          for i = 1 to TCOUNT ;                                                                     
                                                                                                    
             rc = pthread_mutex_lock (mutex) ;                                                      
             checkResults ('pthread_mutex_lock() ' : rc ) ;                                         
             count  += 1 ;                                                                          
             print ('inc_count of tid(' + %char(tid)                                                
                                        + ') ' + %char(count )) ;                                   
                                                                                                    
             if count = COUNT_THRES ;                                                               
                print ('inc_count signal by tid(' + %char(tid)                                      
                                        + ') ' + %char(count )) ;                                   
                rc = pthread_cond_signal (cond ) ;                                                  
                checkResults ('pthread_cond_signal() ' : rc ) ;                                     
             endif ;                                                                                
                                                                                                    
             rc = pthread_mutex_unlock (mutex) ;                                                    
             checkResults ('pthread_mutex_unlock() ' : rc ) ;                                       
                                                                                                    
          endfor ;                                                                                  
                                                                                                    
          return ;                                                                                  
      /end-free                                                                                     
     P inc_count       E                                                                            
      *                                                                                             
     P watch_count     B                                                                            
     D watch_count     PI                                                                           
     D    parm                         *   value                                                    
     D tid             S             10i 0 based(parm)                                              
     D i               S             10i 0                                                          
      /free                                                                                         
          print ('Enter watch_count. tid is ' + %char(tid)) ;                                       
                                                                                                    
          rc = pthread_mutex_lock (mutex) ;                                                         
          checkResults ('pthread_mutex_lock() ' : rc ) ;                                            
          print ('watch_count mutex locked : ' + %char(count)) ;                                    
                                                                                                    
          dow (count < COUNT_THRES ) ;                                                              
             rc = pthread_cond_wait (cond : mutex ) ;                                               
             checkResults ('pthread_cond_wait() ' : rc ) ;                                          
             print ('watch_count : ' + %char(count )) ;                                             
          enddo ;                                                                                   
                                                                                                    
          rc = pthread_mutex_unlock (mutex) ;                                                       
          checkResults ('pthread_mutex_unlock() ' : rc ) ;                                          
                                                                                                    
          return ;                                                                                  
      /end-free                                                                                     
     P watch_count     E                                                                            
      *                                                                                             
      * Utility Procedures                                                                          
      *                                                                                             
     P checkResults    B                   EXPORT                                                   
     D checkResults    PI                                                                           
     D    string                   1000a   varying const                                            
     D    val                        10i 0 value                                                    
      /free                                                                                         
          if val <> 0 ;                                                                             
             print ('Thread(' + fmtThreadId() + ') : ' + string) ;                                  
             CEETREC (*OMIT : *OMIT) ;                                                              
          else ;                                                                                    
             /if defined(LOG_ALL_RESULTS)                                                           
             print (string + ' completed normally with ' + %char(val)) ;                            
             /endif                                                                                 
          endif ;                                                                                   
      /end-free                                                                                     
     P checkResults    E                                                                            
      *                                                                                             
     P print           B                   EXPORT                                                   
     D print           PI                                                                           
     D    msg                      1000a   varying const                                            
     D printf          PR              *   extproc('printf')                                        
     D    template                     *   value options(*string)                                   
     D    string                       *   value options(*string)                                   
     D    dummy                        *   value options(*nopass)                                   
     D NEWLINE         C                   x'15'                                                    
      /free                                                                                         
          printf ('%s' + NEWLINE : msg) ;                                                           
      /end-free                                                                                     
     P print           E                                                                            
      *                                                                                             
     P fmtThreadId     B                   EXPORT                                                   
     D fmtThreadId     PI            17a   varying                                                  
     D pthreadId       DS                  likeds(pthread_id_np_t)                                  
     D buf             S           1000a                                                            
     D sprintf         PR              *   extproc('sprintf')                                       
     D    buf                          *   value                                                    
     D    template                     *   value options(*string)                                   
     D    num1                       10u 0 value                                                    
     D    num2                       10u 0 value                                                    
     D    dummy                        *   value options(*nopass)                                   
      /free                                                                                         
          pthreadId = pthread_getthreadid_np() ;                                                    
          sprintf (%addr(buf)                                                                       
                  : '%.8x % 8x'                                                                     
                  : pthreadId.intId.hi                                                              
                  : pthreadId.intId.lo ) ;                                                          
          return  %str(%addr(buf)) ;                                                                
      /end-free                                                                                     
     P fmtThreadId     E                                                                            

コピー元が QSYSINC ライブラリーではなくなっていますが、ソースメンバ PTHREAD の pthread_cond_wait 部分に書き間違いがあるため(そのせいで CPD5D02 が出て CRTBNDRPG が完了できない)自ソースファイルにコピーして修正して使用しているという理由によるものです。

基本的な流れ

pthread_cond_t という型のデータ・ストラクチャを定義し、↓のように初期化し、その後でスレッドを生成します。

     D cond            DS                  likeds(pthread_cond_t)                                   
     D                                     STATIC(*allthread)                                       

          cond  = PTHREAD_COND_INITIALIZER ;                                                        
          rc = pthread_create(thread(1) : *OMIT                                                     
                             :%paddr(inc_count) : %addr(tid(1))) ;                                  
                                                                                                    
          rc = pthread_create(thread(3) : *OMIT                                                     
                             :%paddr(watch_count) : %addr(tid(3))) ;                                

カウントアップを行うスレッド(inc_count)の方では、共用変数をロックして更新し、閾値に達しているかどうかをチェックし、達していれば状態変数を持って待っているスレッドに知らせます。
閾値に達してなければ、いったんロックをはずして(別のスレッドが更新するかもしれません)再度共用変数を更新するループに入ります。

             rc = pthread_mutex_lock (mutex) ;                                                      
             count  += 1 ;                                                                          
                                                                                                    
             if count = COUNT_THRES ;                                                               
                rc = pthread_cond_signal (cond ) ;                                                  
             endif ;                                                                                
                                                                                                    
             rc = pthread_mutex_unlock (mutex) ;                                                    

知らせを待つスレッド側では、状態変数と mutex に対して待機状態にあります。
複数個同様のスレッドがあった場合は FIFO になります。その複数個すべてにシグナルを送るには pthread_cond_signal ではなく pthread_cond_broadcast を使用する必要があります。

          rc = pthread_mutex_lock (mutex) ;                                                         
                                                                                                    
          dow (count < COUNT_THRES ) ;                                                              
             rc = pthread_cond_wait (cond : mutex ) ;                                               
          enddo ;                                                                                   
                                                                                                    
          rc = pthread_mutex_unlock (mutex) ;                                                       

わざわざダブルチェックのように count の値をチェックしているのは、呼びだされた時に値が同期しているとは限らないからですね。
今まで見てきたように、この count という値を読み出すときに別のスレッドが何をしているかはわかりません。更新されてもうすでに別の値になっている可能性があり、それが条件に現在もあっているかどうかはわからないわけです。

実行例

watch_count スレッドは count が 12 になったところで pthread_cond_signal によって起こされています。これは予定通りなのですが、起こされてから mutex でロックして取得した count はもう 13 になってしまっているのがわかります。

それぞれのスレッドは独立しているので、いわば勝手にプログラムに書かれたとおりのことをします。
ロック (mutex) を取得して count をインクリメントする、というプログラムに書かれているとおりのことを忠実に行っているわけですね。
こういうことは当然起こり得る、ということです。こういうところがマルチスレッド・プログラミングの難しいところですね。

もう一回やってみました。

今回は watch_count スレッドが起こされたところで取得した count がまさしく 12 になっています。

こんなふうにタイミングによって得られる結果が異なるので、より注意深い/防御的なコーディングが必要になるんですね。

[Top Pageに戻る]

Ads by TOK2