NOTE:The implementation described herein does not implement restartable MLOAD jobs due to
While it is possible to implement restartable MLOAD with TdRedux, the MLOAD support provided is intended for moderately sized, "convenience" workloads, rather than large scale loads (e.g., initial terabyte loads).
ERRORLIMITs are supported as of release 1.29 of TdRedux (see below).
Using Multiload with TdRedux requires some special programming considerations. In order to support the full range of IMPORT multiload capability, TdRedux extends the accepted SQL syntax with the following statements:
The default is "MARK DUPLICATE ROWS; MARK MISSING ROWS;".
In addition, to emulate multiple DML labels, the application submits multiple multi-statement requests on the control SQL session, and can provide a "job control" bitmask as a special parameter on each MLOAD session.
In brief:
commit()
on each multiload connection
will throw an exception with the message, "[TdRedux] Error limit exceeded.", a SQLSTATE
of "S1000", and an error code of -1. (ERRORLIMIT is implemented as a local SQL directive,
and is not actually sent to the DBMS for processing, and hence does not return a valid
DBMS error code).
The following example illustrates a 2 job multiload, including an UPSERT operation. It consists of 2 classes, the main control class TdRdxMload, and a thread class - MultiloadThread - which runs an MLOAD session. By creating multiple MultiloadThread objects, a parallel efficient MLOAD job can be run.
/*
* TdRdxMload.java - Test suite for Multiload via TdRedux JDBC driver
*
* Copyright (C) 2003, Presicient Corp.
*
*/
import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;
//
// Test harness for TdRedux
//
class TdRdxMload {
static String myURL = null;
static public void main(String args[]) {
System.out.println("Test 0: Load driver");
try {
Class.forName("com.presicient.tdredux.Driver");
}
catch (ClassNotFoundException E) {
System.out.println("Can't locate TdRedux driver...check your CLASSPATH.");
System.exit(-1);
}
myURL = new String("jdbc:tdredux:" + args[0] + ";user=" + args[1] + ";password=" + args[2]);
//
// Test 18: MULTILOAD wo array binding
//
System.out.println("Test 18: MULTILOAD, no array bind");
mltest(false);
//
// Test 19: MULTILOAD w array binding
//
System.out.println("Test 19: MULTILOAD, array bind");
mltest(true);
System.out.println("Tests complete.");
}
/************************************************************
* run multiload test
************************************************************/
static void mltest(boolean use_array)
{
System.out.println("Testing multiload...");
//
// our USING clause defines the record layout
//
String usingstmt = new String(
"USING (col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 DATE, col12 TIME(2), col13 TIMESTAMP(3)) ");
String mlstmts[] = new String[2];
//
// the first job is an upsert
// that also tracks the records which were
// actually updated
//
mlstmts[0] = new String(
"DO INSERT FOR MISSING UPDATE ROWS;\n" +
"MARK DUPLICATE UPDATE ROWS;\n" +
"UPDATE fltypetst SET col5 = :col5 where col1 = :col1;\n" +
"INSERT INTO fltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,\n" +
":col7, :col8, :col9, :col10, :col11, :col12, :col13);");
//
// the second job is an insert
// that also tracks duplicate records
//
mlstmts[1] = new String(
"MARK DUPLICATE INSERT ROWS;\n" +
"INSERT INTO fltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,\n" +
":col7, :col8, :col9, :col10, :col11, :col12, :col13);");
int numsess = 2;
int lsn = -1;
com.presicient.tdredux.Connection ctlcon = null;
java.sql.Connection mlcon[] = new com.presicient.tdredux.Connection[numsess];
java.sql.Statement ctlstmt = null;
java.sql.ResultSet rs = null;
MultiloadThread mlthread[] = new MultiloadThread[numsess];
try {
//
// logon the control session to get an LSN
//
System.out.println("Logging on control session..");
ctlcon = (com.presicient.tdredux.Connection)DriverManager.getConnection(myURL + ";lsn=0");
lsn = ctlcon.tdrdxGetLSN();
System.out.println("Logging on error session..");
}
catch (SQLException SqlEx) {
System.out.println(SqlEx.getMessage());
return;
}
//
// due to the way we handle DATE types from USING clauses,
// we need to make sure we use INTEGERDATE
//
System.out.println("Force dateform to integerdate...");
try {
ctlstmt = ctlcon.createStatement();
rs = ctlstmt.executeQuery("SET SESSION DATEFORM=INTEGERDATE");
rs.close();
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
return;
}
//
// start with a clean slate...
//
System.out.println("Dropping tables...");
dropTable("wt_fltypetst", ctlstmt);
dropTable("et_fltypetst", ctlstmt);
dropTable("uv_fltypetst", ctlstmt);
dropTable("wt_fltypetst2", ctlstmt);
dropTable("et_fltypetst2", ctlstmt);
dropTable("uv_fltypetst2", ctlstmt);
dropTable("fltypetst", ctlstmt);
dropTable("fltypetst2", ctlstmt);
try {
ctlstmt.executeUpdate(
"CREATE TABLE fltypetst, NO FALLBACK ("
+ "col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 date, col12 time, col13 timestamp) unique primary index(col1);");
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
return;
}
try {
ctlstmt.executeUpdate(
"CREATE TABLE fltypetst2, NO FALLBACK ("
+ "col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 date, col12 time, col13 timestamp) unique primary index(col1);");
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
return;
}
//
// logon our Multiload sessions
//
int i = 0;
try {
for (i = 0; i < numsess; i++) {
System.out.println("Logging on MULTILOAD session " + i + "..");
mlcon[i] = DriverManager.getConnection(
//
// NOTE that we supply the number of jobs here
// so that everyone knows how many jobs we're processing
// this is needed to determine
//
myURL + ";partition=MLOAD;lsn=" + lsn + ";jobs=" + mlstmts.length);
mlcon[i].setAutoCommit(false);
//
// and create a thread for it
//
mlthread[i] = new MultiloadThread(mlcon[i], usingstmt, mlstmts.length,
(i*5000)+1, 5000, i, numsess, use_array);
mlthread[i].setName(new String("MLOAD" + i));
}
}
catch (SQLException SqlEx) {
//
// only exit if its *not* an "All AMPS FULL" error
//
System.out.println(SqlEx.getMessage());
if (SqlEx.getErrorCode() != 3615) {
for (i--; i >= 0; i--) {
try {
mlcon[i].close();
}
catch (SQLException SQLE) { }
}
try {
ctlcon.close();
}
catch (SQLException SQLE) { }
return;
}
numsess = i; // otherwise adjust our session count
}
//
// now start multiloading
// NOTE: ORDER OF EXECUTION IS IMPORTANT HERE!!!
//
long mlstarted = System.currentTimeMillis();
try {
ctlstmt.executeUpdate("BEGIN MLOAD fltypetst, fltypetst2");
ctlstmt.executeUpdate("MLOAD fltypetst WITH wt_fltypetst ERRORTABLES et_fltypetst, uv_fltypetst;");
ctlstmt.executeUpdate("MLOAD fltypetst2 WITH wt_fltypetst2 ERRORTABLES et_fltypetst2, uv_fltypetst2;");
ctlstmt.executeUpdate("ERRORLIMIT 1000");
ctlcon.setAutoCommit(false);
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
//
// release the mload
//
for (i = 0; i < numsess; i++) {
try {
mlcon[i].close();
}
catch (SQLException E) { }
}
//
// it takes a while for mload to clean up internally, so
// we may need to release a few times
//
while (true) {
try {
ctlcon.setAutoCommit(true);
ctlstmt.executeUpdate("RELEASE MLOAD fltypetst, fltypetst2;");
break;
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
if (SQLE.getErrorCode() != 2571)
break;
}
catch (Exception E) {
System.out.println(SQLE.getMessage());
break;
}
}
return;
}
//
// This is where TdRedux is smart about the supplied queries...
// it will parse and break them up, and process directives
// internally
//
try {
ctlstmt.executeUpdate(usingstmt + mlstmts[0]);
ctlstmt.executeUpdate(usingstmt + mlstmts[1]);
ctlcon.commit();
ctlstmt.executeUpdate("CHECKPOINT LOADING;");
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
//
// release the mload
//
for (i = 0; i < numsess; i++) {
try {
mlcon[i].close();
}
catch (SQLException E) { }
}
//
// it takes a while for mload to clean up internally, so
// we may need to release a few times
//
while (true) {
try {
ctlcon.setAutoCommit(true);
ctlstmt.executeUpdate("RELEASE MLOAD fltypetst, fltypetst2;");
break;
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
if (SQLE.getErrorCode() != 2571)
break;
}
catch (Exception E) {
System.out.println(SQLE.getMessage());
break;
}
}
return;
}
//
// start our threads
//
for (i = 0; i < numsess; i++) {
mlthread[i].start();
}
//
// wait for completion
//
int errcount = 0;
for (i = 0; i< numsess; i++) {
try {
mlthread[i].join();
String mlresult = mlthread[i].getResult();
System.out.println("Thread " + i + " completed with result " +
mlresult);
if (! mlresult.startsWith("OK")) errcount++;
}
catch (InterruptedException IE) { }
}
if (errcount != 0) {
for (i = 0; i < numsess; i++) {
try {
mlcon[i].close();
}
catch (SQLException E) { }
}
//
// we must loop here since we always get the 2571 error
// on the first RELEASE
//
while (true) {
try {
ctlcon.setAutoCommit(true);
ctlstmt.executeUpdate("RELEASE MLOAD fltypetst, fltypetst2;");
break;
}
catch (SQLException E) {
System.out.println(E.getMessage());
if (E.getErrorCode() != 2571) break;
}
catch (Exception E) {
System.out.println(E.getMessage());
}
}
}
else {
System.out.println("Start APPLY phase...");
try {
ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
ctlcon.commit();
ctlcon.setAutoCommit(true);
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
//
// start the apply phase
//
try {
System.out.println("Acquisition phase complete, starting apply phase.");
ctlstmt.executeUpdate("EXEC MLOAD fltypetst; EXEC MLOAD fltypetst2;");
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
try {
ctlcon.setAutoCommit(false);
ctlstmt.executeUpdate("END MLOAD;");
ctlcon.commit();
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
//
// wrap up
//
mlstarted = (System.currentTimeMillis() - mlstarted)/1000;
System.out.println("Load time was " + mlstarted + " secs");
for (i = 0; i < numsess; i++) {
try {
mlcon[i].close();
}
catch (SQLException SQLE) { }
}
}
//
// retrieve the error records and report them here
//
int errcnt = 0;
int dupcnt = 0;
try {
ctlcon.setAutoCommit(true);
rs = ctlstmt.executeQuery("SELECT COUNT(*) from et_fltypetst");
rs.next();
errcnt = rs.getInt(1);
rs = ctlstmt.executeQuery("SELECT COUNT(*) from et_fltypetst2");
rs.next();
errcnt += rs.getInt(1);
if (errcnt != 0)
System.out.println( errcnt + " errors generated during multiload of fltypetst and/or fltypetst2.");
else
System.out.println("Multiload completed successfully." );
rs = ctlstmt.executeQuery("SELECT COUNT(*) from uv_fltypetst");
rs.next();
dupcnt = rs.getInt(1);
rs = ctlstmt.executeQuery("SELECT COUNT(*) from uv_fltypetst2");
rs.next();
dupcnt += rs.getInt(1);
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
System.out.println("Dropping work tables...");
dropTable("wt_fltypetst", ctlstmt);
dropTable("wt_fltypetst2", ctlstmt);
if (errcnt == 0) {
dropTable("et_fltypetst", ctlstmt);
dropTable("et_fltypetst2", ctlstmt);
}
if (dupcnt == 0) {
dropTable("uv_fltypetst", ctlstmt);
dropTable("uv_fltypetst2", ctlstmt);
}
try {
ctlcon.close();
}
catch (SQLException SQLE) { }
System.out.println("Multiload completed successfully.");
}
static public void dropTable(String table, java.sql.Statement stmt) {
try {
stmt.executeUpdate("DROP TABLE " + table);
}
catch (SQLException SQLE) {
if (SQLE.getErrorCode() != 3807) {
System.out.println(SQLE.getMessage());
}
}
}
}
import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;
//
// Test harness for TdRedux Multiload
//
public class MultiloadThread extends Thread {
private java.sql.Connection conn = null;
private int startat = 0;
private int count = 0;
private int col1[];
private short col2[];
private byte col3[];
private String col4[];
private String col5[];
private double col6[];
private BigDecimal col7[];
private BigDecimal col8[];
private BigDecimal col9[];
private BigDecimal col10[];
private java.sql.Date col11[];
private java.sql.Time col12[];
private java.sql.Timestamp col13[];
private byte mlbitmask[];
private int mlseqnums[];
private static final String alphas =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_ ";
private Random r = null;
private int errcount = 0;
private String errmsg = new String("OK");
private boolean use_array = false;
private String usingstmt = null;
private int mljobcnt = 0;
private int offset = 0;
private int seqnum = 0;
static protected byte[] jobmasks = { 1,2,4,8,16,32,64 };
public MultiloadThread(java.sql.Connection conn,
String usingstmt, int jobcnt, int startidx, int count, int seqnum,
int offset, boolean use_array) {
this.conn = conn;
this.startat = startidx;
this.count = count;
this.usingstmt = usingstmt;
col1 = new int[250];
col2 = new short[250];
col3 = new byte[250];
col4 = new String[250];
col5 = new String[250];
col6 = new double[250];
col7 = new BigDecimal[250];
col8 = new BigDecimal[250];
col9 = new BigDecimal[250];
col10 = new BigDecimal[250];
col11 = new java.sql.Date[250];
col12 = new java.sql.Time[250];
col13 = new java.sql.Timestamp[250];
mlbitmask = new byte[250];
mlseqnums = new int[250];
r = new Random(System.currentTimeMillis());
mljobcnt = jobcnt;
this.usingstmt = usingstmt;
this.offset = offset;
this.seqnum = seqnum;
this.use_array = use_array;
}
public void run() {
com.presicient.tdredux.PreparedStatement mlstmt = null;
//
// create our default job control bitmask
// if you need to emulate the conditional APPLY
// (ie, APPLY label WHERE...), only set the bit
// for the job when the condition is true
//
for (int i = 0; i < 250; i++) {
for (int j=0; j < mljobcnt; j++)
mlbitmask[i] += jobmasks[j];
}
try {
//
// prepare the USING clause to define the record layout
//
mlstmt = (com.presicient.tdredux.PreparedStatement)
conn.prepareStatement(usingstmt + " INSERT INTO ");
//
// establish our sequnce number and offset;
// we set these once and then let TdRedux manage them
// from then on
//
mlstmt.setInt(-2, seqnum);
mlstmt.setInt(-3, offset);
}
catch (SQLException SQLE) { }
int total = 0;
while (total < count) {
try {
collect_recs_each(total+startat, 250, mljobcnt);
total += 250;
if (use_array) {
//
// compute our running sequence numbers
// the offset will be the number of multiload
// threads, so our sequence numbers are interleaved
// between the threads
//
mlstmt.tdrdxBindIntArray(1,col1);
mlstmt.tdrdxBindShortArray(2,col2);
mlstmt.tdrdxBindByteArray(3,col3);
mlstmt.tdrdxBindStringArray(4,col4);
mlstmt.tdrdxBindStringArray(5,col5);
mlstmt.tdrdxBindDoubleArray(6,col6);
mlstmt.tdrdxBindBigDecimalArray(7,col7);
mlstmt.tdrdxBindBigDecimalArray(8,col8);
mlstmt.tdrdxBindBigDecimalArray(9,col9);
mlstmt.tdrdxBindBigDecimalArray(10,col10);
mlstmt.tdrdxBindDateArray(11,col11);
mlstmt.tdrdxBindTimeArray(12,col12);
mlstmt.tdrdxBindTimestampArray(13,col13);
//
// add the sequence number and job mask as "magic" parms
//
mlstmt.tdrdxBindByteArray(-1, mlbitmask);
mlstmt.executeUpdate();
}
else {
for (int j = 0; j < 250; j++) {
mlstmt.setInt(1,col1[j]);
mlstmt.setShort(2,col2[j]);
mlstmt.setByte(3,col3[j]);
mlstmt.setString(4,col4[j]);
mlstmt.setString(5,col5[j]);
mlstmt.setDouble(6,col6[j]);
mlstmt.setBigDecimal(7,col7[j]);
mlstmt.setBigDecimal(8,col8[j]);
mlstmt.setBigDecimal(9,col9[j]);
mlstmt.setBigDecimal(10,col10[j]);
mlstmt.setDate(11,col11[j]);
mlstmt.setTime(12,col12[j]);
mlstmt.setTimestamp(13,col13[j]);
//
// add the sequence number and job mask as "magic" parms
//
mlstmt.setByte(-1, mlbitmask[j]);
seqnum += offset;
mlstmt.executeUpdate();
}
}
conn.commit();
}
catch (SQLException SQLE) {
errcount = 1;
errmsg = new String(SQLE.getMessage());
try {
for (int i = 0; i < mljobcnt; i++)
mlstmt.close();
}
catch (SQLException E) { }
return;
}
System.out.println(this.getName() + " loaded " + total + " rows...");
} //end while
try {
mlstmt.close();
}
catch (SQLException SQLE) { }
}
public String getResult()
{
if (errcount == 0)
return new String("OK. Loaded " + count + " rows.");
return new String("ERROR: " + errmsg);
}
private int abs(int v) { return (v < 0) ? v * -1 : v; }
//
// some support methods to generate load data
//
private String rndstring(int len)
{
char s[] = new char[len];
for (int j = 0; j < len; j++)
s[j] = alphas.charAt(abs(r.nextInt())%(alphas.length()));
return new String(s);
}
private void collect_recs_each(int base, int count, int jobcnt)
{
//
// col1 integer
// col2 smallint
// col3 byteint
// col4 char(20)
// col5 varchar(40)
// col6 float
// col7 decimal(2,1)
// col8 decimal(4,2)
// col9 decimal(8,4)
// col10 decimal(14,5)
//
int bufsize = 0;
int pad = 14 + 2 + 1 + 2 + 4 + 20 + 2 + 8 + 1 + 2 + 4 + 8 + 4 + 10 + 23;
for (int i = 0; (i < count) && (bufsize < 63000); i++, base++) {
col1[i] = base;
col2[i] = (short)(base%32767);
col3[i] = (byte)(base%255 - 128);
col4[i] = new String(rndstring(20));
col5[i] = (base%20 == 0) ? null : new String(rndstring((abs(r.nextInt())%40)+1));
col6[i] = r.nextDouble();
long l = r.nextLong();
col7[i] = new BigDecimal(l%100 * 0.1);
col7[i] = col7[i].setScale(1, BigDecimal.ROUND_HALF_EVEN);
col8[i] = new BigDecimal(l%10000 * 0.01);
col8[i] = col8[i].setScale(2, BigDecimal.ROUND_HALF_EVEN);
col9[i] = new BigDecimal(l%100000000 * 0.0001);
col9[i] = col9[i].setScale(4, BigDecimal.ROUND_HALF_EVEN);
col10[i] = new BigDecimal(l%100000000000L * 0.00001);
col10[i] = col10[i].setScale(5, BigDecimal.ROUND_HALF_EVEN);
col11[i] = (base%20 == 0) ? null : new java.sql.Date(System.currentTimeMillis());
col12[i] = (base%20 == 0) ? null : new java.sql.Time(System.currentTimeMillis());
col13[i] = (base%20 == 0) ? null : new java.sql.Timestamp(System.currentTimeMillis());
bufsize += pad + ((col5[i] != null) ? col5[i].length() : 0);
}
}
}