ILE RPG でマルチスレッド・アプリケーション (4) - 排他制御 (semaphore) -

ILE RPG でマルチスレッド・アプリケーション (2) - 排他制御 (mutex) -」で mutex を使用して排他制御を実現してみたプログラムを、そのまま semaphore を使用した排他制御に変えてみました
(特に計測してはいないのですが、どうも体感でこちらの方がかなり遅く、そのため出力するメッセージの数とループの回数を減らしています)

     H THREAD(*CONCURRENT)                                                                          
     H BNDDIR('QC2LE')                                                                              
     H DFTACTGRP(*NO)                                                                               
     H OPTION(*SRCSTMT : *NOUNREF)                                                                  
      /COPY QSYSINC/QRPGLESRC,PTHREAD                                                               
      /COPY QSYSINC/QRPGLESRC,SYSSEM                                                                
      /COPY QSYSINC/QRPGLESRC,SYSSTAT                                                               
      *                                                                                             
     D NUMTHREADS      C                   10                                                       
      *                                                                                             
     D do_add1         PR            10i 0 extproc('do_add1')                                       
     D    rc1                          *   value                                                    
      *                                                                                             
     D do_add2         PR            10i 0 extproc('do_add2')                                       
     D    rc2                          *   value                                                    
      *                                                                                             
     D rc1             S             10i 0 inz(100)                                                 
     D rc2             S             10i 0 inz(10)                                                  
      *                                                                                             
     D checkResults    PR                  extproc('checkResults')                                  
     D    string                   1000a   varying const                                            
     D    val                        10i 0 value                                                    
      *                                                                                             
     D print           PR                  extproc('print')                                         
     D    msg                      1000a   varying const                                            
      *                                                                                             
     D fmtThreadId     PR            17a   varying                                                  
      *                                                                                             
     D CEETREC         PR                                                                           
     D    cel_rc_mod                 10i 0 OPTIONS(*OMIT)                                           
     D    user_rc                    10i 0 OPTIONS(*OMIT)                                           
      *                                                                                             
     D sleep           PR                  extproc(*CWIDEN : 'sleep')                               
     D    secs                       10i 0 value                                                    
      *                                                                                             
     D thread          DS                  likeds(pthread_t)                                        
     D                                     dim(NUMTHREADS)                                          
      *                                                                                             
     D rc              S             10i 0 inz(0)                                                   
      *                                                                                             
     D sharedData      S             10i 0 inz(0)                                                   
     D                                     STATIC(*allthread)                                       
      *                                                                                             
     D checkResultsErrno...                                                                         
     D                 PR                  extproc('checkResultsErrno')                             
     D    string                   1000a   varying const                                            
     D    cond                         n   value                                                    
      *                                                                                             
     D semaphoreId     S             10i 0 static(*allthread)                                       
      *                                                                                             
     D lockOperation   DS                  likeds(struct_sembuf)                                    
     D                                     dim(1)                                                   
     D                                     static(*allthread)                                       
      *                                                                                             
     D unlockOperation...                                                                           
     D                 DS                  likeds(struct_sembuf)                                    
     D                                     dim(1)                                                   
     D                                     static(*allthread)                                       
      *                                                                                             
      /free                                                                                         
                                                                                                    
          print ('Initialize struct sembuf') ;                                                      
          lockOperation(1).sem_num =  0 ;                                                           
          lockOperation(1).sem_op  = -1 ;                                                           
          lockOperation(1).sem_flg =  0 ;                                                           
                                                                                                    
          unlockOperation(1).sem_num = 0 ;                                                          
          unlockOperation(1).sem_op  = 1 ;                                                          
          unlockOperation(1).sem_flg = 0 ;                                                          
                                                                                                    
          print ('Create/start a private semaphore') ;                                              
          semaphoreId =                                                                             
                 semget(IPC_PRIVATE : 1 : 0 + S_IRUSR + S_IWUSR) ;                                  
          checkResultsErrno('semget ' : semaphoreId >= 0) ;                                         
                                                                                                    
          print ('Set the semaphore count to 1') ;                                                  
          rc = semctl(semaphoreId : 0 : CMD_SETVAL : 1) ;                                           
          checkResults('semctl(SETVAL) ' : rc) ;                                                    
                                                                                                    
          rc = semop(semaphoreId : lockOperation(1) : 1) ;                                          
          checkResultsErrno('main semop(lock) ' : rc = 0 ) ;                                        
          print ('Wait on semaphore in main') ;                                                     
                                                                                                    
          print ('Create/start a thread') ;                                                         
          rc = pthread_create(thread(1) : *OMIT                                                     
                                  :%paddr(do_add1) : %addr(rc1)) ;                                  
          checkResults('pthread_create() ' : rc) ;                                                  
                                                                                                    
          print ('Create/start another thread') ;                                                   
          rc = pthread_create(thread(2) : *OMIT                                                     
                             :%paddr(do_add2) : %addr(rc2)) ;                                       
          checkResults('pthread_create() ' : rc) ;                                                  
                                                                                                    
          rc = semop(semaphoreId : unlockOperation(1) : 1) ;                                        
          checkResultsErrno('main semop(unlock) ' : rc = 0 ) ;                                      
          print ('Unlock in main') ;                                                                
                                                                                                    
          sleep(10) ;                                                                               
                                                                                                    
          print ('Sync threads') ;                                                                  
          rc = pthread_join  (thread(1) : *OMIT) ;                                                  
          checkResults('pthread_join() ' : rc) ;                                                    
                                                                                                    
          print ('Sync another thread') ;                                                           
          rc = pthread_join  (thread(2) : *OMIT) ;                                                  
          checkResults('pthread_join() ' : rc) ;                                                    
                                                                                                    
          print ('do_add1 returns ' + %char(rc1)) ;                                                 
          print ('do_add2 returns ' + %char(rc2)) ;                                                 
          print ('Shared Data is  ' + %char(SharedData)) ;                                          
                                                                                                    
          return ;                                                                                  
                                                                                                    
      /end-free                                                                                     
      *                                                                                             
     P do_add1         B                                                                            
     D do_add1         PI            10i 0                                                          
     D    parm                         *   value                                                    
     D r1              S             10i 0 based(parm)                                              
     D i               S             10i 0                                                          
      /free                                                                                         
          print ('Enter do_add1 ') ;                                                                
                                                                                                    
          for i = 1 to 10 ;                                                                         
             r1  += 1 ;                                                                             
             print ('do_add1 : ' + %char(r1 )) ;                                                    
                                                                                                    
             rc = semop(semaphoreId : lockOperation(1) : 1) ;                                       
             checkResultsErrno('thread semop(lock) ' : rc = 0 ) ;                                   
             print ('Critical Section Begin in do_add1') ;                                          
                                                                                                    
             sharedData  += 1 ;                                                                     
             print ('do_add1 Shared: ' + %char(sharedData )) ;                                      
                                                                                                    
             rc = semop(semaphoreId : unlockOperation(1) : 1) ;                                     
             checkResultsErrno('thread semop(unlock) ' : rc = 0 ) ;                                 
             print ('Critical Section End in do_add1') ;                                            
                                                                                                    
          endfor ;                                                                                  
          rc1 = r1 ;                                                                                
          return 0 ;                                                                                
      /end-free                                                                                     
     P do_add1         E                                                                            
      *                                                                                             
     P do_add2         B                                                                            
     D do_add2         PI            10i 0                                                          
     D    parm                         *   value                                                    
     D r2              S             10i 0 based(parm)                                              
     D i               S             10i 0                                                          
      /free                                                                                         
          print ('Enter do_add2 ') ;                                                                
          for i = 1 to 20 ;                                                                         
             r2   += 1 ;                                                                            
             print ('do_add2 : ' + %char(r2 )) ;                                                    
                                                                                                    
             rc = semop(semaphoreId : lockOperation(1) : 1) ;                                       
             checkResultsErrno('thread semop(lock) ' : rc = 0 ) ;                                   
             print ('Critical Section Begin in do_add2') ;                                          
                                                                                                    
             sharedData  += 1 ;                                                                     
             print ('do_add2 Shared: ' + %char(sharedData )) ;                                      
                                                                                                    
             rc = semop(semaphoreId : unlockOperation(1) : 1) ;                                     
             checkResultsErrno('thread semop(unlock) ' : rc = 0 ) ;                                 
             print ('Critical Section End in do_add2') ;                                            
                                                                                                    
          endfor ;                                                                                  
          rc2 = r2 ;                                                                                
          return 0 ;                                                                                
      /end-free                                                                                     
     P do_add2         E                                                                            
      *                                                                                             
      * Utility Procedures                                                                          
      *                                                                                             
     P checkResultsErrno...                                                                         
     P                 B                   EXPORT                                                   
     D checkResultsErrno...                                                                         
     D                 PI                                                                           
     D    string                   1000a   varying const                                            
     D    cond                         n   value                                                    
     D getErrnoPtr     PR              *   extproc('__errno')                                       
     D errnoVal        S             10i 0 based(threadErrnoPtr)                                    
      /free                                                                                         
          if not cond ;                                                                             
             threadErrnoPtr = getErrnoPtr() ;                                                       
             print (string + ' Errno(' + %char(errnoVal) + ')') ;                                   
             CEETREC (*OMIT : *OMIT) ;                                                              
          else ;                                                                                    
             /if defined(LOG_ALL_RESULTS)                                                           
             print (string + ' completed normally with ' + cond ) ;                                 
             /endif                                                                                 
          endif ;                                                                                   
      /end-free                                                                                     
     P checkResultsErrno...                                                                         
     P                 E                                                                            
      *                                                                                             
     P checkResults    B                   EXPORT                                                   
     D checkResults    PI                                                                           
     D    string                   1000a   varying const                                            
     D    val                        10i 0 value                                                    
      /free                                                                                         
          if val <> 0 ;                                                                             
             print ('Thread(' + fmtThreadId() + ') : ' + string) ;                                  
             CEETREC (*OMIT : *OMIT) ;                                                              
          else ;                                                                                    
             /if defined(LOG_ALL_RESULTS)                                                           
             print (string + ' completed normally with ' + %char(val)) ;                            
             /endif                                                                                 
          endif ;                                                                                   
      /end-free                                                                                     
     P checkResults    E                                                                            
      *                                                                                             
     P print           B                   EXPORT                                                   
     D print           PI                                                                           
     D    msg                      1000a   varying const                                            
     D printf          PR              *   extproc('printf')                                        
     D    template                     *   value options(*string)                                   
     D    string                       *   value options(*string)                                   
     D    dummy                        *   value options(*nopass)                                   
     D NEWLINE         C                   x'15'                                                    
      /free                                                                                         
          printf ('%s' + NEWLINE : msg) ;                                                           
      /end-free                                                                                     
     P print           E                                                                            
      *                                                                                             
     P fmtThreadId     B                   EXPORT                                                   
     D fmtThreadId     PI            17a   varying                                                  
     D pthreadId       DS                  likeds(pthread_id_np_t)                                  
     D buf             S           1000a                                                            
     D sprintf         PR              *   extproc('sprintf')                                       
     D    buf                          *   value                                                    
     D    template                     *   value options(*string)                                   
     D    num1                       10u 0 value                                                    
     D    num2                       10u 0 value                                                    
     D    dummy                        *   value options(*nopass)                                   
      /free                                                                                         
          pthreadId = pthread_getthreadid_np() ;                                                    
          sprintf (%addr(buf)                                                                       
                  : '%.8x % 8x'                                                                     
                  : pthreadId.intId.hi                                                              
                  : pthreadId.intId.lo ) ;                                                          
          return  %str(%addr(buf)) ;                                                                
      /end-free                                                                                     
     P fmtThreadId     E                                                                            

セマフォを使用する基本的な流れ

セマフォを使用する流れです。mutex の場合と流れ自体はそれほど変わりません。

まずメインルーチンで初期化し、

          semaphoreId =                                                                             
                 semget(IPC_PRIVATE : 1 : 0 + S_IRUSR + S_IWUSR) ;                                  

カウントを 1にセットし、

          rc = semctl(semaphoreId : 0 : CMD_SETVAL : 1) ;                                           

ロックします。

          rc = semop(semaphoreId : lockOperation(1) : 1) ;                                          

ロックした状態でスレッドを生成し、

          rc = pthread_create(thread(1) : *OMIT                                                     
                                  :%paddr(do_add1) : %addr(rc1)) ;                                  

スレッドの生成が終わったところでロックを解除します。

          rc = semop(semaphoreId : unlockOperation(1) : 1) ;                                        

メインルーチンでロックが解除されたところで各スレッドの処理が始まりますが、その中で共用変数へのアクセスのあいだだけセマフォのロック/アンロックを行います。

     P do_add1         B                                                                            
     D do_add1         PI            10i 0                                                          
     D    parm                         *   value                                                    
     D r1              S             10i 0 based(parm)                                              
      /free                                                                                         
                                                                                                    
             rc = semop(semaphoreId : lockOperation(1) : 1) ;                                       
                                                                                                    
             sharedData  += 1 ;                                                                     
                                                                                                    
             rc = semop(semaphoreId : unlockOperation(1) : 1) ;                                     

実行例

生成された各スレッドがセマフォのオペレーションをしています。

メインルーチンで pthread_join の同期待ちをしているところですね。

実行結果です。

メインでのアンロックが終わったところでスレッドの実行が始まっています。

各スレッド内でロックとアンロックにはさまれて共用の変数が更新されていっているのがわかりますね。

共用の変数もかなりランダムにそれぞれのスレッドによって更新されていることがわかります。

[Top Pageに戻る]

Ads by TOK2