分散 DB のコーディング (Java)

今回は分散 DB プログラミングの紹介です。


「分散DB」とは?

昔に比べれば、データベースがいろんなマシンに分散している、という状況を見ることが多くなってきた気がします。

Linux + MySQL というような組み合わせの場合、キャパシティや可用性の問題からいくつかのマシンに分散することはよくありますね。
そのためにリクエストを再ルートさせたり、リアルタイム性をある程度犠牲にしたレプリケーションをしたり、というアプリケーション/システムの設計にするわけです。

DB2 for IBM i では、上にあげたような Linux + MySQL のような必要性は(少なくともキャパシティや可用性の観点からは)ありません。
データベース・リクエストを振り分ける仕組みやタイミングを考えたり、レプリカを取るタイミングや差異が出てしまった時の対応を考えたり、といったことをしないですむわけです。

もともと、データベースにとって「分散データベース」というものは、「2つの同時に接続されているデータベースをどう扱うか」の問題でした。

トランザクションの考え方を伴う場合、用意されている仕組みは「二相コミット(2 Phase Commit)」と言われるもので、これが分散データベースを考える上での基本になります。

二相コミットの詳細な仕組みはいろんなサイトや本などを見ていただくとして、この仕組みが前提としているのは↓のような接続になっています。

つまり、プログラムの中に 2つ「接続」が存在している、というのが前提です。
その「接続」のあいだで整合を取るためのプロトコルが「二相コミット」なんですね。

ときどき勘違いされている方がいるのですが、たとえば↓のような形は「二相コミット」の対象にはなりません。

「↑のようなかたちにしてはいけない」んですね。

たとえばこの場合、いまどきの技術では「データベース B に Web サービスを提供してもらう」というのが一般的な解になるでしょうか。


分散 DB のコーディング

今回はインフォメーション・センターに載っていた例をちょっとだけ変更して実行してみました。V5R4 で実行可能です。

その「ちょっとだけ」の変更、というのは DB2 Type2 Driver から JTOpen Type4 Driver への変更です。
これもたまに「できることが違う」ために「JTOpen Type4 ではなく DB2 Type2 を使う必要がある」などという人がいて、特にこういう「DRDA 関連」についてもそういわれることがあるので、そうではないことを示すためにそうしてみました。

実際に実行例で示しますが、JTOpen のドライバを使っておけば以下のような 3つの環境で使用することがカンタンにできます。

JTOpen のドライバは TCP/IP が使えれば、jar ファイルを一つ持っていくだけで使用できます。

/******************************************************************************/               
/*   PROGRAM NAME: SampJava                                                   */
/*                                                                            */              
/*   DESCRIPTIVE NAME: Sample Java application using DRDA                     */
/*                                                                            */              
/*   FUNCTION: This module processes the PART_STOCK table and                 */
/*             for each part below the ROP (REORDER POINT)                    */
/*             creates a supply order.                                        */              
/*                                                                            */              
/*   LOCAL TABLES:  PART_STOCK                                                */
/*                                                                            */              
/*   REMOTE TABLES: PART_ORDER, PART_ORDLN, SHIPMENTLN                        */
/*                                                                            */              
/*   COMPILE OPTIONS:                                                         */
/*   javac SampJava.java                                                      */
/*                                                                            */              
/*   INVOKED BY:                                                              */
/*   java SampJava lcldbname rmtdbname                                        */
/******************************************************************************/              
import java.sql.*;

public class SampJava {
  /* private static String JDBCDriver = "com.ibm.db2.jcc.DB2Driver"; を jtopen JDBC 対応に変更*/
  private static String JDBCDriver = "com.ibm.as400.access.AS400JDBCDriver";
  private static String part_table = " "; /* part number in table part_stock   */
  private static long line_count = 0;     /* total number of order lines       */
  private static long eoq_table = 0;      /* reorder quantity , tbl part_stock */
  private static long quant_table = 0;    /* quantity in stock, tbl part_stock */
  private static long rop_table = 0;      /* reorder point    , tbl part_stock */
  private static int contl = 0;           /* continuation line, tbl order_line */
  private static short next_num = 0;      /* next order nbr,table part_order   */

  /****************************************************************************/              
  /* Method For Reseting Environment                                          */
  /****************************************************************************/              
  private static void resetTables(Connection rmtConn) throws SQLException {                                                                      

    Statement stmt1 = rmtConn.createStatement();

    /* Clean up for rerunability in test environment                          */
    stmt1.executeUpdate("DELETE FROM DRDA.PART_ORDLN WHERE ORDER_NUM IN " +
                        "       (SELECT ORDER_NUM FROM DRDA.PART_ORDER " +
                        "                         WHERE ORDER_TYPE = 'R')");
    stmt1.executeUpdate("DELETE FROM DRDA.PART_ORDER WHERE ORDER_TYPE = 'R'");
    stmt1.close();
    rmtConn.commit();

  } /* function delete_for_rerun                                              */  

  /****************************************************************************/              
  /* Method For Calculating Order Quantity                                    */
  /****************************************************************************/              
  private static void calculateOrderQuantity(Connection lclConn, Connection rmtConn, String loc)
throws SQLException {
    PreparedStatement prpStmt1;
    PreparedStatement prpStmt2;
    ResultSet rsltSet1;
    ResultSet rsltSet2;
    short ord_table = 0;            /* order nbr.       , tbl order_line      */
    short orl_table = 0;            /* order line       , tbl order_line      */

    prpStmt1 = lclConn.prepareStatement("SELECT PART_NUM, PART_QUANT, PART_ROP, PART_EOQ " +
                                        "       FROM DRDA.PART_STOCK WHERE PART_ROP > PART_QUANT AND " +
                                        "       PART_NUM > ? ORDER BY PART_NUM");
    prpStmt1.setString(1,part_table);
    rsltSet1 = prpStmt1.executeQuery();
    if (rsltSet1.next() == false) {
      System.out.println("--------------------------------");
      System.out.println("NUMBER OF LINES CREATED = " + line_count);
      System.out.println("--------------------------------");
      System.out.println("*****  END OF PROGRAM  *********");
      rop_table = 0;                /* no (more) orders to process            */
    }
    else {
      /* available qty = Stock qty + qty in order - qty received              */
      part_table = rsltSet1.getString(1);
      quant_table = rsltSet1.getLong(2);
      rop_table = rsltSet1.getLong(3);
      eoq_table = rsltSet1.getLong(4);
      long qty_rec = 0;

      prpStmt2 = rmtConn.prepareStatement("SELECT A.ORDER_NUM, ORDER_LINE, QUANT_REQ " +
                                          "      FROM DRDA.PART_ORDLN A, DRDA.PART_ORDER B " +
                                          "      WHERE PART_NUM = ? AND LINE_STAT  <> 'C' AND " +
                                          "      A.ORDER_NUM = B.ORDER_NUM AND ORDER_TYPE  = 'R'");
      prpStmt2.setString(1,part_table);
      rsltSet2 = prpStmt2.executeQuery();
      while (rsltSet2.next()) {
        ord_table = rsltSet2.getShort(1);
        orl_table = rsltSet2.getShort(2);
        long qty_table = rsltSet2.getLong(3);
        qty_rec = qty_rec + qty_table;
      }
      rsltSet2.close();prpStmt2 = rmtConn.prepareStatement("SELECT SUM(QUANT_RECV) FROM DRDA.SHIPMENTLN " +
                                          "       WHERE ORDER_LOC = ? AND ORDER_NUM = ? AND " +
                                          "       ORDER_LINE = ?");
      prpStmt2.setString(1,loc);
      prpStmt2.setShort(2,ord_table);
      prpStmt2.setShort(3,orl_table);
      rsltSet2 = prpStmt2.executeQuery();
      rsltSet2.next();
      long qty_table = rsltSet2.getLong(1);
      qty_rec = qty_rec + qty_table;
      rsltSet2.close();
      prpStmt2.close();
    }
    rsltSet1.close();
    prpStmt1.close();

  } /* end of calculate_order_quantity                                        */
    
                                                                                         
  /****************************************************************************/              
  /* Method For Processing Orders                                             */
  /****************************************************************************/              
  private static void processOrder(Connection rmtConn, String loc) throws SQLException {                                                         
    PreparedStatement prpStmt1;
    ResultSet rsltSet1;

    /* insert order and order_line in remote database                         */
    if (contl == 0) {
      prpStmt1 = rmtConn.prepareStatement("SELECT (MAX(ORDER_NUM) + 1) FROM DRDA.PART_ORDER");
      rsltSet1 = prpStmt1.executeQuery();
      rsltSet1.next();
      next_num = rsltSet1.getShort(1);
      rsltSet1.close();
      prpStmt1 = rmtConn.prepareStatement("INSERT INTO DRDA.PART_ORDER (ORDER_NUM, ORIGIN_LOC, ORDER_TYPE, ORDER_STAT, CREAT_TIME) " +
                                          "       VALUES (?, ?, 'R', 'O', CURRENT TIMESTAMP)");
      prpStmt1.setShort(1,next_num);
      prpStmt1.setString(2,loc);
      prpStmt1.executeUpdate();
      System.out.println("*****  ROP PROCESSING  *********");
      System.out.println("ORDER NUMBER = " + next_num);
      System.out.println("--------------------------------");
      System.out.println("   LINE     PART       QTY      ");
      System.out.println("   NBR      NBR        REQUESTED");
      System.out.println("--------------------------------");
      contl = contl + 1;
    }  /* if contl == 0                                                       */ 
    prpStmt1 = rmtConn.prepareStatement("INSERT INTO DRDA.PART_ORDLN (ORDER_NUM, ORDER_LINE, PART_NUM, QUANT_REQ, LINE_STAT) " +
                                        "       VALUES (?, ?, ?, ?, 'O')");
    prpStmt1.setShort(1,next_num);
    prpStmt1.setInt(2,contl);
    prpStmt1.setString(3,part_table);
    prpStmt1.setLong(4,eoq_table);
    prpStmt1.executeUpdate();
    line_count = line_count + 1;
    System.out.println("   " + line_count + "        " + part_table + "      " + eoq_table + "");
    contl = contl + 1;
    prpStmt1.close();
                                                                                            
  } /* end of function processOrder                                           */

                                                                                            
  /****************************************************************************/              
  /* Method For Displaying Errors                                             */
  /****************************************************************************/              
  private static void errorFunction(SQLException e, Connection lclConn, Connection rmtConn) {

    System.out.println("************************");
    System.out.println("*      SQL ERROR       *");
    System.out.println("************************");
    System.out.println("SQLCODE    = " + e.getErrorCode());
    System.out.println("SQLSTATE    = " + e.getSQLState());
    System.out.println("**********************");                                           
    try {
      lclConn.rollback();
      rmtConn.rollback();
    }
    catch (SQLException uowErr) {
    }

  } /* end of function errorFunction                                         */

  /****************************************************************************/              
  /* Mainline                                                                 */
  /****************************************************************************/              
  public static void main(String[] args) {
    String User = "USRPRF";
    String Password = "PASSWORD";
    String lclUrl = null;
    String rmtUrl = null;
    String loc = "SQLA";            /* dealer's database name                 */
    Connection lclConn = null;
    Connection rmtConn = null;

    try {
      Class.forName(JDBCDriver).newInstance();
    }
    catch (Exception e) {
   /* System.out.println("Error: Failed to load DB2 driver."); */
      System.out.println("Error: Failed to load JDBC driver.");
      System.exit(1);
    }

    try {
   /* lclUrl = "jdbc:db2:" を jtopen JDBC 対応に変更 */
      lclUrl = "jdbc:as400://" + args[0];
      lclConn = DriverManager.getConnection(lclUrl, User, Password);
      rmtUrl = "jdbc:as400://" + args[1];
      rmtConn = DriverManager.getConnection(rmtUrl, User, Password);
    }
    catch (Exception e) {
      System.out.println("Error: Failed to get database connections.");
      System.exit(1);
    }

    try {
      /* Initialization                                                       */
      resetTables(rmtConn);

      /* Main Work                                                            */
      do {
        calculateOrderQuantity(lclConn, rmtConn, loc);
        if (rop_table > quant_table) {                        
          processOrder(rmtConn, loc);
          quant_table = 0;
        }
      } while (rop_table != 0);

      /* End Work                                                             */
      lclConn.commit();
      rmtConn.commit();
    }
    catch (SQLException e) {
      e.printStackTrace();
      errorFunction(e, lclConn, rmtConn);
      System.exit(1);
    }
  }
}

実行例

ターゲットとするデータベース(両方とも DB2 for i V5R4)に 2つとも接続できる PC クライアントからの実行例です。

こちらは、↑でターゲットとされている片方のデータベースの Qshell から実行したみたものです。

当り前ですが、両方とも同じ結果が表示されているのがわかりますね。

こんなふうに、PC でも IBM i でも実行できるのが Java のいいところですし、JTOpen の手軽なところでもありますね。

事前の準備

今回の例を動かすには 2つのデータベースのそれぞれテーブルなどの準備が必要になります。

"ローカル DB"用の SQL スクリプト

CREATE SCHEMA DRDA;

CREATE TABLE DRDA.PART_ORDER (ORDER_NUM  SMALLINT  NOT NULL,
                              ORIGIN_LOC CHAR(4)   NOT NULL,
                              ORDER_TYPE CHAR(1)   NOT NULL,
                              ORDER_STAT CHAR(1)   NOT NULL,
                              NUM_ALLOC  SMALLINT  NOT NULL WITH DEFAULT,
                              URG_REASON CHAR(1)   NOT NULL WITH DEFAULT,
                              CREAT_TIME TIMESTAMP NOT NULL,
                              ALLOC_TIME TIMESTAMP,
                              CLOSE_TIME TIMESTAMP,
                              REV_REASON CHAR(1)
                             ) ;

CREATE UNIQUE INDEX DRDA.PART_ORDEI ON DRDA.PART_ORDER (ORDER_NUM ASC);

CREATE TABLE DRDA.PART_ORDLN (ORDER_NUM  SMALLINT  NOT NULL,
                              ORDER_LINE SMALLINT  NOT NULL,
                              PART_NUM   CHAR(5)   NOT NULL,
                              QUANT_REQ  INTEGER   NOT NULL,
                              LINE_STAT  CHAR(1)   NOT NULL
                             )                                                               ;

CREATE UNIQUE INDEX DRDA.PART_ORDLI ON DRDA.PART_ORDLN (ORDER_NUM ASC,
                                                   ORDER_LINE ASC);

CREATE TABLE DRDA.SHIPMENTLN (SHIP_NUM   SMALLINT  NOT NULL,
                              SHIP_LINE  SMALLINT  NOT NULL,
                              ORDER_LOC  CHAR(4)   NOT NULL,
                              ORDER_NUM  SMALLINT  NOT NULL,
                              ORDER_LINE SMALLINT  NOT NULL,
                              PART_NUM   CHAR(5)   NOT NULL,
                              QUANT_SHIP INTEGER   NOT NULL,
                              QUANT_RECV INTEGER   NOT NULL WITH DEFAULT
                             )                                                               ;

CREATE UNIQUE INDEX DRDA.SHIPMENTLI ON DRDA.SHIPMENTLN (SHIP_NUM ASC,
                                                   SHIP_LINE ASC);

INSERT INTO DRDA.PART_ORDER VALUES(1,'DB2B','U','O',0,' ','1991-03-12-17.00.00',
                              NULL,NULL,NULL);

INSERT INTO DRDA.PART_ORDER VALUES(2,'SQLA','U','O',0,' ','1991-03-12-17.01.00',
                              NULL,NULL,NULL);

INSERT INTO DRDA.PART_ORDER VALUES(3,'SQLA','U','O',0,' ','1991-03-12-17.02.00',
                              NULL,NULL,NULL);

INSERT INTO DRDA.PART_ORDER VALUES(4,'SQLA','U','O',0,' ','1991-03-12-17.03.00',
                              NULL,NULL,NULL);

INSERT INTO DRDA.PART_ORDER VALUES(5,'DB2B','U','O',0,' ','1991-03-12-17.04.00',
                              NULL,NULL,NULL);

 
INSERT INTO DRDA.PART_ORDLN VALUES(1,1,'24110',005,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(1,2,'24100',021,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(1,3,'24090',018,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(2,1,'14070',004,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(2,2,'37040',043,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(2,3,'14030',015,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(3,2,'14030',025,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(3,1,'43010',003,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(4,1,'36010',013,'O');

INSERT INTO DRDA.PART_ORDLN VALUES(5,1,'18030',005,'O');

INSERT INTO DRDA.SHIPMENTLN VALUES(1,1,'DB2B',1,1,'24110',5,5);

INSERT INTO DRDA.SHIPMENTLN VALUES(1,2,'DB2B',1,2,'24100',10,1);

INSERT INTO DRDA.SHIPMENTLN VALUES(2,1,'SQLA',2,1,'14070',4,4);

INSERT INTO DRDA.SHIPMENTLN VALUES(2,2,'SQLA',2,2,'37040',45,25);

INSERT INTO DRDA.SHIPMENTLN VALUES(2,3,'SQLA',2,3,'14030', 5,5);

INSERT INTO DRDA.SHIPMENTLN VALUES(3,1,'SQLA',2,3,'14030', 5,5);

"リモート DB"用の SQL スクリプト

CREATE SCHEMA DRDA;

CREATE TABLE DRDA.PART_STOCK (PART_NUM   CHAR(5)   NOT NULL,
                              PART_UM    CHAR(2)   NOT NULL,
                              PART_QUANT INTEGER   NOT NULL WITH DEFAULT,
                              PART_ROP   INTEGER   NOT NULL,
                              PART_EOQ   INTEGER   NOT NULL,
                              PART_BIN   CHAR(6)   NOT NULL WITH DEFAULT
                             )                                                               ;

CREATE UNIQUE INDEX DRDA.PART_STOCI ON DRDA.PART_STOCK (PART_NUM ASC);

INSERT INTO DRDA.PART_STOCK VALUES('14020','EA',038,050,100,' ');

INSERT INTO DRDA.PART_STOCK VALUES('14030','EA',043,050,050,' ');

INSERT INTO DRDA.PART_STOCK VALUES('14040','EA',030,020,030,' ');

INSERT INTO DRDA.PART_STOCK VALUES('14050','EA',010,005,015,' ');

INSERT INTO DRDA.PART_STOCK VALUES('14060','EA',110,045,090,' ');

INSERT INTO DRDA.PART_STOCK VALUES('14070','EA',130,080,160,' ');

INSERT INTO DRDA.PART_STOCK VALUES('18020','EA',013,025,050,' ');

INSERT INTO DRDA.PART_STOCK VALUES('18030','EA',015,005,010,' ');

INSERT INTO DRDA.PART_STOCK VALUES('21010','EA',029,030,050,' ');

INSERT INTO DRDA.PART_STOCK VALUES('24010','EA',025,020,040,' ');

INSERT INTO DRDA.PART_STOCK VALUES('24080','EA',054,050,050,' ');

INSERT INTO DRDA.PART_STOCK VALUES('24090','EA',030,025,050,' ');

INSERT INTO DRDA.PART_STOCK VALUES('24100','EA',020,015,030,' ');

INSERT INTO DRDA.PART_STOCK VALUES('24110','EA',052,050,080,' ');

INSERT INTO DRDA.PART_STOCK VALUES('25010','EA',511,300,600,' ');

INSERT INTO DRDA.PART_STOCK VALUES('36010','EA',013,005,010,' ');

INSERT INTO DRDA.PART_STOCK VALUES('36020','EA',110,030,060,' ');

INSERT INTO DRDA.PART_STOCK VALUES('37010','EA',415,100,200,' ');

INSERT INTO DRDA.PART_STOCK VALUES('37020','EA',010,020,040,' ');

INSERT INTO DRDA.PART_STOCK VALUES('37030','EA',154,055,060,' ');

INSERT INTO DRDA.PART_STOCK VALUES('37040','EA',223,120,120,' ');

INSERT INTO DRDA.PART_STOCK VALUES('43010','EA',110,020,040,' ');

INSERT INTO DRDA.PART_STOCK VALUES('43020','EA',067,050,050,' ');

INSERT INTO DRDA.PART_STOCK VALUES('48010','EA',032,030,060,' ');

[Top Pageに戻る]

Ads by TOK2