NOTE:The implementation described herein does not implement restartable Export jobs due to
While it is possible to implement restartable Exports with TdRedux, the Export support provided is intended for moderately sized, "convenience" workloads, rather than large scale exports (e.g., terabyte exports).
Also note that this implementation has not been tested with complex or parameterized SQL. A future release may alleviate this restriction.
Using Fastexport with TdRedux requires some special programming considerations. In brief:
TdRedux internally uses the segment-number and segment-increment to automatically "continue" export sessions so that the execution of the NULL PreparedStatement appears to return results in a single continuous stream, rather than discrete segments (as returned by the database).
The following example illustrates a simple Export. By creating multiple ExportThread objects, a parallel efficient Fastload job can be run.
static void fexptest() {
System.out.println("Test fastexport...");
int numsess = 2;
int lsn = -1;
com.presicient.tdredux.Connection ctlcon = null;
java.sql.Connection fexcon[] = new com.presicient.tdredux.Connection[numsess];
ExportThread fexthread[] = new ExportThread[numsess];
java.sql.Statement ctlstmt = null;
java.sql.ResultSet rs = null;
Properties props = null;
//
// create the control session
//
try {
System.out.println("Logging on control session..");
ctlcon = (com.presicient.tdredux.Connection)DriverManager.getConnection(myURL + ";lsn=0");
lsn = ctlcon.tdrdxGetLSN();
ctlstmt = ctlcon.createStatement();
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
if (SQLE.getErrorCode() != 3807)
return;
}
//
// prepare the fastexport SELECT now since we can't do it after
// BEGIN FASTEXPORT; besides, this gives us the column names
// and types
//
com.presicient.tdredux.PreparedStatement expstmt = null;
try {
expstmt = (com.presicient.tdredux.PreparedStatement)ctlcon.prepareStatement(
"SELECT * from fltypetst");
expstmt.tdrdxSetKeepResp(true);
}
catch (SQLException SQLE) {
System.out.println("Query prepare failed: " + SQLE.getMessage());
try {
ctlcon.close();
}
catch (SQLException E) { }
return;
}
//
// logon N EXPORT sessions, specifying the control session's LSN value
// for tdat_lsn and 'EXPORT' for tdat_utility,
//
int i = 0;
try {
for (i = 0; i < numsess; i++) {
System.out.println("Logging on EXPORT session " + i + "..");
fexcon[i] = DriverManager.getConnection(
myUtilURL + ";partition=EXPORT;lsn=" + lsn);
fexcon[i].setAutoCommit(false);
}
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
//
// check for all amps full error
//
if (SQLE.getErrorCode() != 2632) {
try {
for (int j = 0; j < i; j++) fexcon[j].close();
ctlcon.close();
} catch (SQLException S) { }
}
}
numsess = i;
System.out.println("All AMPs full..." + numsess + " total EXPORT sessions.");
//
// send BEGIN FASTEXPORT; on control session
//
try {
ctlcon.setAutoCommit(false);
ctlstmt.executeUpdate("BEGIN FASTEXPORT;");
}
catch (SQLException SQLE) {
System.out.println( "BEGIN EXPORT failed: " + SQLE.getMessage());
try {
for (int j = 0; j < numsess; j++) fexcon[j].close();
ctlcon.close();
} catch (SQLException S) { }
return;
}
//
// Execute fastexport query on control session
//
long festarted = System.currentTimeMillis();
try {
rs = expstmt.executeQuery();
}
catch (SQLException SQLE) {
System.out.println( "Query execute failed: " + SQLE.getMessage());
try {
for (int j = 0; j < numsess; j++) fexcon[j].close();
ctlcon.close();
} catch (SQLException S) { }
return;
}
//
// start each export thread
//
for (i = 0; i < numsess; i++) {
try {
fexthread[i] = new ExportThread((com.presicient.tdredux.Connection)fexcon[i],
(com.presicient.tdredux.ResultSetMetaData)expstmt.getMetaData(), i+1, numsess);
fexthread[i].setName(new String("FEXP" + i));
}
catch (SQLException SQLE) { }
}
for (i = 0; i < numsess; i++)
fexthread[i].start();
//
// wait for them to complete
//
for (i = 0; i < numsess; i++) {
try {
fexthread[i].join();
System.out.println("Export thread " + i + " finished with result " +
fexthread[i].getResult());
}
catch (InterruptedException IE) { }
}
festarted = (System.currentTimeMillis() - festarted)/1000;
System.out.println("Export completed in " + festarted + " secs...");
System.out.println("Export complete, finishing up...");
//
// wrap up the Export
//
try {
ctlstmt.executeUpdate("END FASTEXPORT;");
ctlcon.commit();
for (i = 0; i < numsess; i++)
fexcon[i].close();
ctlcon.close();
}
catch (SQLException SQLE) {
System.out.println(SQLE.getMessage());
}
System.out.println("Fastexport OK.");
}
import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;
//
// Test harness for TdRedux Fastexport
//
public class ExportThread extends Thread {
private com.presicient.tdredux.Connection conn = null;
private int count = 0;
private int seqnum[] = { 1 };
private int qnum[] = { 1 };
private int increment[] = { 1 };
private String errmsg = "OK";
private int errcount = 0;
com.presicient.tdredux.ResultSetMetaData rsmd = null;
public ExportThread(com.presicient.tdredux.Connection conn,
com.presicient.tdredux.ResultSetMetaData rsmd, int first_seg, int seg_incr) {
this.conn = conn;
this.rsmd = rsmd;
seqnum[0] = first_seg;
increment[0] = seg_incr;
}
public String getResult() {
return new String((errcount == 0) ? "OK. Recv'd " + count + " rows." :
"ERROR: " + errmsg + " after " + count + " rows.");
}
public void run() {
com.presicient.tdredux.PreparedStatement fexstmt = null;
com.presicient.tdredux.ResultSet fexrs = null;
byte[][] rows = new byte[100][];
int total = 0;
try {
//
// prepare a NULL statement to initiate the EXPORT
//
fexstmt = (com.presicient.tdredux.PreparedStatement)conn.prepareStatement(";");
//
// 3 pseudo-params need to be bound: the query number, the sequence number,
// the sequence number increment
// use a "bind" semantic to support auto-continue of export sessions
// Note that the seqnum is auto-incremented internally within a mutex
//
fexstmt.tdrdxBindIntArray(1, qnum);
fexstmt.tdrdxBindIntArray(2, seqnum);
fexstmt.tdrdxBindIntArray(3, increment);
fexrs = (com.presicient.tdredux.ResultSet)fexstmt.executeQuery();
//
// copy metadata from the control query
//
fexrs.tdrdxCopyMetaData(rsmd);
//
// bind array of byte arrays to column 0 to indicate raw row fetch
//
fexrs.tdrdxBindBinaryArray(0, rows);
}
catch (SQLException SQLE) {
errcount = 1;
errmsg = new String("Can't prepare on export thread " + this.getName() + ":" +
SQLE.getMessage());
return;
}
try {
int rowsrcvd = 0;
int curseg = 0;
while (fexrs.next()) {
if (curseg != seqnum[0]) {
curseg = seqnum[0];
System.out.println(this.getName() + " on segment " + curseg);
}
rowsrcvd = fexrs.tdrdxGetRowsFetched();
if (rowsrcvd == 0)
break;
count += rowsrcvd;
}
}
catch (SQLException SQLE) {
if (SQLE.getErrorCode() != 2588) {
errcount = 1;
errmsg = new String(SQLE.getMessage());
}
}
try {
fexrs.close();
}
catch (SQLException E) { }
}
}