NOTE:The implementation described herein does not implement restartable Fastload jobs due to
While it is possible to implement restartable Fastload with TdRedux, the Fastload support provided is intended for moderately sized, "convenience" workloads, rather than large scale loads (e.g., initial terabyte loads).
Error limits are supported as of release 1.29 (see below).
Using Fastload with TdRedux requires some special programming considerations. In brief:
commit()
on each fastload connection
will throw an exception with the message, "[TdRedux] Error limit exceeded.", a SQLSTATE
of "S1000", and an error code of -1. (ERRORLIMIT is implemented as a local SQL directive,
and is not actually sent to the DBMS for processing, and hence does not return a valid
DBMS error code).
The following example illustrates a 2 job fastload. By creating multiple FastloadThread objects, a parallel efficient Fastload job can be run.
static void fltest(boolean use_array)
{
System.out.println("Testing fastload...");
//
// define our INSERT statement with a USING that
// matches the record layout
//
String instmt = new String(
"USING (col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 DATE, col12 TIME(2), col13 TIMESTAMP(3)) "
+ "INSERT INTO fltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6, "
+ ":col7, :col8, :col9, :col10, :col11, :col12, :col13);");
int numsess = 2;
int lsn = -1;
com.presicient.tdredux.Connection ctlcon = null;
java.sql.Connection flcon[] = new java.sql.Connection[numsess];
java.sql.Statement ctlstmt = null;
java.sql.ResultSet rs = null;
FastloadThread flthread[] = new FastloadThread[numsess];
//
// logon the control session
//
try {
System.out.println("Logging on control session..");
ctlcon = (com.presicient.tdredux.Connection)DriverManager.getConnection(myURL + ";lsn=0");
lsn = ctlcon.tdrdxGetLSN();
System.out.println("Logging on error session..");
}
catch (SQLException SqlEx) {
System.out.println(SqlEx.getMessage());
return;
}
//
// force INTEGERDATE, since TdRedux only supports INTEGERDATE
// for USING clause input
//
System.out.println("Force dateform to integerdate...");
try {
ctlstmt = ctlcon.createStatement();
rs = ctlstmt.executeQuery("SET SESSION DATEFORM=INTEGERDATE");
rs.close();
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
return;
}
System.out.println("Dropping error tables...");
dropTable("fltypetst_err1", ctlstmt);
dropTable("fltypetst_err2", ctlstmt);
dropTable("fltypetst", ctlstmt);
//
// create our table
//
try {
ctlstmt.executeUpdate(
"CREATE TABLE fltypetst, NO FALLBACK ("
+ "col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 date, col12 time, col13 timestamp) unique primary index(col1);");
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
return;
}
//
// logon our fastload sessions and start a thread for each
//
try {
for (int i = 0; i < numsess; i++) {
System.out.println("Logging on FASTLOAD session " + i + "..");
flcon[i] = DriverManager.getConnection(
myUtilURL + ";partition=FASTLOAD;lsn=" + lsn);
flcon[i].setAutoCommit(false);
flthread[i] = new FastloadThread(flcon[i], instmt, (i*5000)+1, 5000, use_array);
flthread[i].setName(new String("FLOAD" + i));
}
}
catch (SQLException SqlEx) {
System.out.println(SqlEx.getMessage());
//
// check if we've just filled up our AMPs
//
if (SQLE.getErrorCode() != 3615)
return;
numsess = i;
}
//
// now start fastloading
//
long flstarted = System.currentTimeMillis();
try {
ctlstmt.executeUpdate(
"BEGIN LOADING fltypetst ERRORFILES fltypetst_ERR1, fltypetst_ERR2");
ctlstmt.executeUpdate("ERRORLIMIT 1000");
ctlcon.setAutoCommit(false);
}
catch (SQLException SQLE) {
//
// cleanup the load if we get an error
//
System.out.println(SQLE.getMessage());
try {
ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
}
catch (Exception E) { }
try {
ctlcon.commit();
}
catch (Exception E) { }
try {
ctlstmt.executeUpdate("END LOADING;");
}
catch (Exception E) { }
try {
ctlcon.commit();
}
catch (Exception E) { }
return;
}
//
// execute the INSERT on the control session
//
try {
ctlstmt.executeQuery(instmt);
}
catch (SQLException SQLE) {
//
// cleanup the load if we get an error
//
System.out.println(SQLE.getMessage());
try {
ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
ctlcon.commit();
}
catch (SQLException E) { }
try {
ctlstmt.executeUpdate("END LOADING;");
ctlcon.commit();
}
catch (SQLException E) { }
return;
}
//
// start our threads
//
for (int i = 0; i <numsess; i++) {
flthread[i].start();
}
//
// wait for completion
//
for (int i = 0; i< numsess; i++) {
try {
flthread[i].join();
System.out.println("Thread " + i + " completed with result " +
flthread[i].getResult());
}
catch (InterruptedException IE) { }
}
//
// we're done loading, so wrap up
//
System.out.println("Start END LOADING phase...");
try {
ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
try {
ctlcon.commit();
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
try {
ctlstmt.executeUpdate("END LOADING;");
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
try {
ctlcon.commit();
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
flstarted = (System.currentTimeMillis() - flstarted)/1000;
System.out.println("Load time was " + flstarted + " secs");
for (int i = 0; i < numsess; i++) {
try {
flcon[i].close();
}
catch (SQLException SQLE) { }
}
//
// retrieve the error records and report them here
//
try {
ctlcon.setAutoCommit(true);
rs = ctlstmt.executeQuery("SELECT COUNT(*) from fltypetst_err1");
rs.next();
int errcnt = rs.getInt(1);
if (errcnt != 0)
System.out.println( errcnt + " errors generated during fastload of fltypetst.");
else {
rs = ctlstmt.executeQuery("SELECT COUNT(*) from fltypetst_err2");
rs.next();
errcnt = rs.getInt(1);
if (errcnt != 0)
System.out.println( errcnt + " errors generated during fastload of fltypetst.");
else
System.out.println("Fastload completed successfully." );
rs = ctlstmt.executeQuery("SELECT COUNT(*) from fltypetst");
rs.next();
errcnt = rs.getInt(1);
System.out.println("Loaded " + errcnt + " rows." );
System.out.println("Dropping error tables...");
dropTable("fltypetst_err2", ctlstmt);
dropTable("fltypetst_err1", ctlstmt);
}
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
try {
ctlcon.close();
}
catch (SQLException SQLE) { }
System.out.println("Fastload completed successfully.");
}
import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;
//
// Test harness for TdRedux Fastload
//
public class FastloadThread extends Thread {
private java.sql.Connection conn = null;
private int startat = 0;
private int count = 0;
private int col1[];
private short col2[];
private byte col3[];
private String col4[];
private String col5[];
private double col6[];
private BigDecimal col7[];
private BigDecimal col8[];
private BigDecimal col9[];
private BigDecimal col10[];
private java.sql.Date col11[];
private java.sql.Time col12[];
private java.sql.Timestamp col13[];
private static final String alphas =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_ ";
private Random r = null;
private int errcount = 0;
private String errmsg = new String("OK");
private String instmt = null;
private boolean use_array = false;
public FastloadThread(java.sql.Connection conn,
String instmt, int startidx, int count, boolean use_array) {
this.conn = conn;
this.startat = startidx;
this.count = count;
this.instmt = new String(instmt);
//
// define our data generator arrays
//
col1 = new int[250];
col2 = new short[250];
col3 = new byte[250];
col4 = new String[250];
col5 = new String[250];
col6 = new double[250];
col7 = new BigDecimal[250];
col8 = new BigDecimal[250];
col9 = new BigDecimal[250];
col10 = new BigDecimal[250];
col11 = new java.sql.Date[250];
col12 = new java.sql.Time[250];
col13 = new java.sql.Timestamp[250];
r = new Random(System.currentTimeMillis());
this.use_array = use_array;
}
public void run() {
com.presicient.tdredux.PreparedStatement flstmt = null;
try {
//
// prepare actual stmt to get USING param types
//
flstmt = (com.presicient.tdredux.PreparedStatement)
conn.prepareStatement(instmt);
}
catch (SQLException SQLE) { }
int total = 0;
while (total < count) {
try {
collect_recs_each(total+startat, 250);
total += 250;
if (use_array) {
//
// bind arrays as needed
//
flstmt.tdrdxBindIntArray(1,col1);
flstmt.tdrdxBindShortArray(2,col2);
flstmt.tdrdxBindByteArray(3,col3);
flstmt.tdrdxBindStringArray(4,col4);
flstmt.tdrdxBindStringArray(5,col5);
flstmt.tdrdxBindDoubleArray(6,col6);
flstmt.tdrdxBindBigDecimalArray(7,col7);
flstmt.tdrdxBindBigDecimalArray(8,col8);
flstmt.tdrdxBindBigDecimalArray(9,col9);
flstmt.tdrdxBindBigDecimalArray(10,col10);
flstmt.tdrdxBindDateArray(11,col11);
flstmt.tdrdxBindTimeArray(12,col12);
flstmt.tdrdxBindTimestampArray(13,col13);
flstmt.execute();
}
else {
//
// otherwise execute for each record to be loaded
//
for (int j = 0; j < 250; j++) {
flstmt.setInt(1,col1[j]);
flstmt.setShort(2,col2[j]);
flstmt.setByte(3,col3[j]);
flstmt.setString(4,col4[j]);
flstmt.setString(5,col5[j]);
flstmt.setDouble(6,col6[j]);
flstmt.setBigDecimal(7,col7[j]);
flstmt.setBigDecimal(8,col8[j]);
flstmt.setBigDecimal(9,col9[j]);
flstmt.setBigDecimal(10,col10[j]);
flstmt.setDate(11,col11[j]);
flstmt.setTime(12,col12[j]);
flstmt.setTimestamp(13,col13[j]);
flstmt.execute();
}
}
//
// this actually transmits the data to the
// database
//
conn.commit();
}
catch (SQLException SQLE) {
errcount = 1;
errmsg = new String(SQLE.getMessage());
try {
flstmt.close();
}
catch (SQLException E) { }
return;
}
System.out.println(this.getName() + " loaded " + total + " rows...");
} //end while
//
// all done, wrap up
//
try {
flstmt.close();
}
catch (SQLException SQLE) { }
}
public String getResult()
{
if (errcount == 0)
return new String("OK. Loaded " + count + " rows.");
return new String("ERROR: " + errmsg);
}
//
// various methods for generating load data
//
private int abs(int v) { return (v < 0) ? v * -1 : v; }
private String rndstring(int len)
{
char s[] = new char[len];
for (int j = 0; j < len; j++)
s[j] = alphas.charAt(abs(r.nextInt())%(alphas.length()));
return new String(s);
}
private void collect_recs_each(int base, int count)
{
//
// col1 integer
// col2 smallint
// col3 byteint
// col4 char(20)
// col5 varchar(40)
// col6 float
// col7 decimal(2,1)
// col8 decimal(4,2)
// col9 decimal(8,4)
// col10 decimal(14,5)
//
for (int i = 0; i < count; i++, base++) {
col1[i] = base;
col2[i] = (short)(base%32767);
col3[i] = (byte)(base%255 - 128);
col4[i] = new String(rndstring(20));
col5[i] = (base%20 == 0) ? null : new String(rndstring((abs(r.nextInt())%40)+1));
col6[i] = r.nextDouble();
long l = r.nextLong();
col7[i] = new BigDecimal(l%100 * 0.1);
col7[i] = col7[i].setScale(1, BigDecimal.ROUND_HALF_EVEN);
col8[i] = new BigDecimal(l%10000 * 0.01);
col8[i] = col8[i].setScale(2, BigDecimal.ROUND_HALF_EVEN);
col9[i] = new BigDecimal(l%100000000 * 0.0001);
col9[i] = col9[i].setScale(4, BigDecimal.ROUND_HALF_EVEN);
col10[i] = new BigDecimal(l%100000000000L * 0.00001);
col10[i] = col10[i].setScale(5, BigDecimal.ROUND_HALF_EVEN);
col11[i] = (base%20 == 0) ? null : new java.sql.Date(System.currentTimeMillis());
col12[i] = (base%20 == 0) ? null : new java.sql.Time(System.currentTimeMillis());
col13[i] = (base%20 == 0) ? null : new java.sql.Timestamp(System.currentTimeMillis());
}
}
}