你的位置:首页 > Java教程

[Java教程]java 并发插入数据到oracle


各位技术大牛,请教java 多线程问题

由于项目需要,需要从oracle中dept将表记录迁移到dept2中,dept和dept2表结构一样。
设想:

查询:
Statement stmt = conn.createStatement(
            ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);

ResultSet rs = stmt.executeQuery("select * from dept");

通过 rs.last();得到记录的总个数,然后通过这个总个数,多线程实现迁移所有的记录。

插入:
PreparedStatement pstmt = null;
        String psql = "insert into dept2 values(?,?,?)";
        pstmt = conn.prepareStatement(psql);

其中的index为rs结果集的索引值,当index=1,表示rs.next()的第一个值
                    rs.absolute(index);
                    pstmt.setInt(1, rs.getInt(1));
                    pstmt.setString(2, rs.getString(2));
                    pstmt.setString(3, rs.getString(3));

                    pstmt.addBatch();

本人对多线程不是很了解,多次尝试,没能实现多线程迁移数据,希望有高手指点。谢谢!

 

public class ConcurrentTestPreparedStmt {    public static int   totalRowNum;    public static int   index = 1;    public static boolean isFinish;    public static void main(String[] args) throws Exception   {     Class.forName("oracle.jdbc.driver.OracleDriver");      String url = "jdbc:oracle:thin:@192.168.5.201:1521:orcl";     String user = "scott";     String password = "tiger";      String sql = "select * from dept";      Connection conn = DriverManager.getConnection(url, user, password);      /**     * 读取     */     Statement stmt = conn.createStatement(       ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);     ResultSet rs = stmt.executeQuery(sql);      /**     * 写入     * 使用批处理     */     PreparedStatement pstmt = null;     String psql = "insert into testpstmt values(?,?,?)";     pstmt = conn.prepareStatement(psql);     //rs.next 只要为true,说明表中记录数      rs.last();     totalRowNum = rs.getRow();      int i = 0;     //问题所在,当 i<2时,表示两个线程,执行时,往目的表中插入了同样一行记录。     while (i < 1 && !isFinish)     {       TaskThread t = new ConcurrentTestPreparedStmt().new TaskThread(         pstmt, rs);       new Thread(t).start();       i++;     }      //如果没完成,则主线程等待     while (!isFinish)     {       Thread.sleep(500);     }     pstmt.executeBatch();     conn.commit();      System.out.println("SUCCESS");    }    public synchronized int incrementIndex()   {     return index++;   }    public synchronized int getExecuteNum()   {     return index;   }    /**   * 并发迁移类   * @author ttan   */   class TaskThread implements Runnable   {     PreparedStatement pstmt = null;     ResultSet     rs  = null;      public TaskThread(PreparedStatement pstmt, ResultSet rs)     {       this.rs = rs;       this.pstmt = pstmt;     }      @Override     public void run()     {        int index = incrementIndex();       //指定到具体的行数       try       {         while (index <= totalRowNum)         {           System.out.println("index = " + index);           rs.absolute(index);           pstmt.setInt(1, rs.getInt(1));           pstmt.setString(2, rs.getString(2));           pstmt.setString(3, rs.getString(3));            pstmt.addBatch();            index = incrementIndex();         }          if (getExecuteNum() > totalRowNum)         {           isFinish = true;           return;         }        }       catch (SQLException e)       {         e.printStackTrace();       }     }   } }  

java企业级通用权限安全框架源码获取【下载地址】 

  1. public class ConcurrentTestPreparedStmt  
  2. {  
  3.   
  4.     public static int     totalRowNum;  
  5.   
  6.     public static int     index = 1;  
  7.   
  8.     public static boolean isFinish;  
  9.   
  10.     public static void main(String[] args) throws Exception  
  11.     {  
  12.         Class.forName("oracle.jdbc.driver.OracleDriver");  
  13.   
  14.         String url = "jdbc:oracle:thin:@192.168.5.201:1521:orcl";  
  15.         String user = "scott";  
  16.         String password = "tiger";  
  17.   
  18.         String sql = "select * from dept";  
  19.   
  20.         Connection conn = DriverManager.getConnection(url, user, password);  
  21.   
  22.         /** 
  23.          * 读取 
  24.          */  
  25.         Statement stmt = conn.createStatement(  
  26.             ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);  
  27.         ResultSet rs = stmt.executeQuery(sql);  
  28.   
  29.         /** 
  30.          * 写入 
  31.          * 使用批处理 
  32.          */  
  33.         PreparedStatement pstmt = null;  
  34.         String psql = "insert into testpstmt values(?,?,?)";  
  35.         pstmt = conn.prepareStatement(psql);  
  36.         //rs.next 只要为true,说明表中记录数  
  37.   
  38.         rs.last();  
  39.         totalRowNum = rs.getRow();  
  40.   
  41.         int i = 0;  
  42.         //问题所在,当 i<2时,表示两个线程,执行时,往目的表中插入了同样一行记录。  
  43.         while (i < 1 && !isFinish)  
  44.         {  
  45.             TaskThread t = new ConcurrentTestPreparedStmt().new TaskThread(  
  46.                 pstmt, rs);  
  47.             new Thread(t).start();  
  48.             i++;  
  49.         }  
  50.   
  51.         //如果没完成,则主线程等待  
  52.         while (!isFinish)  
  53.         {  
  54.             Thread.sleep(500);  
  55.         }  
  56.         pstmt.executeBatch();  
  57.         conn.commit();  
  58.   
  59.         System.out.println("SUCCESS");  
  60.   
  61.     }  
  62.   
  63.     public synchronized int incrementIndex()  
  64.     {  
  65.         return index++;  
  66.     }  
  67.   
  68.     public synchronized int getExecuteNum()  
  69.     {  
  70.         return index;  
  71.     }  
  72.   
  73.     /** 
  74.      * 并发迁移类 
  75.      * @author ttan 
  76.      */  
  77.     class TaskThread implements Runnable  
  78.     {  
  79.         PreparedStatement pstmt = null;  
  80.         ResultSet         rs    = null;  
  81.   
  82.         public TaskThread(PreparedStatement pstmt, ResultSet rs)  
  83.         {  
  84.             this.rs = rs;  
  85.             this.pstmt = pstmt;  
  86.         }  
  87.   
  88.         @Override  
  89.         public void run()  
  90.         {  
  91.   
  92.             int index = incrementIndex();  
  93.             //指定到具体的行数  
  94.             try  
  95.             {  
  96.                 while (index <= totalRowNum)  
  97.                 {  
  98.                     System.out.println("index = " + index);  
  99.                     rs.absolute(index);  
  100.                     pstmt.setInt(1, rs.getInt(1));  
  101.                     pstmt.setString(2, rs.getString(2));  
  102.                     pstmt.setString(3, rs.getString(3));  
  103.   
  104.                     pstmt.addBatch();  
  105.   
  106.                     index = incrementIndex();  
  107.                 }  
  108.   
  109.                 if (getExecuteNum() > totalRowNum)  
  110.                 {  
  111.                     isFinish = true;  
  112.                     return;  
  113.                 }  
  114.   
  115.             }  
  116.             catch (SQLException e)  
  117.             {  
  118.                 e.printStackTrace();  
  119.             }  
  120.         }  
  121.     }  
  122. }