SparkSQL integrator setup¶
Operator setup for the SparkSQL target in ELTMaestro — the HDFS/Spark connection, the Hive Thrift Server it registers external tables in, the per-integrator system.cfg, the onstage-group load flow, and a verification checklist. Companion to REDSHIFT-SETUP.md and CLICKHOUSE-AUTH.md; same operator-doc shape.
SparkSQL is the engine's default platform — every non-Snowflake job runs as a SparkJob — but the SPARKSQL jobType specifically is the one that stages source data into HDFS as Parquet and registers it as Hive external tables in a Spark Thrift Server, so it needs an HDFS namenode, a Spark distribution, and a running Thrift Server (HiveServer2). Loads go source JDBC → local PSV → HDFS Parquet → Hive external table.
At a glance — what you need¶
| Prerequisite | Detail |
|---|---|
| HDFS namenode | Reachable from the engine host (default hdfs://localhost:9000); hdfs-site.xml + core-site.xml present on the engine host. |
| Spark distribution | Bundled at $MAESTRO_ENGINE_HOME/tools/spark; the launcher runs spark-submit … RunBatch in --master local[2]. |
| Spark Thrift Server | A running HiveServer2 (start-thriftserver.sh, warehouse hdfs://…/dwh, port 10000) — see thrift.cfg.txt. |
| Hive2 JDBC connection | A JDBC connection (default name HIVE_EMBEDDED) to the Thrift Server, driver class org.apache.hive.jdbc.HiveDriver (jar hive-jdbc-3.1.3-standalone.jar / MPP_Driver_Hive_JDBC.jar). Used to register external tables. Overridable via $HIVE_CONNECTION. |
| HDFS/Spark connection | An ELTMaestro HDFS-type General/Cloud connection (e.g. ELTM_SPARK_DRIVER) carrying fs.prefix, the Hadoop XML paths, the thrift.hive2.* catalog target, and optional S3A keys. |
| Source connection(s) | The JDBC source(s) you ingest from (SQL Server, ClickHouse, Redshift, …). |
| Object storage (optional) | $OBJECT_STORAGE=S3_CONNECTION for the S3 schema-staging steps (see MPP-SETUP.md). |
1. Architecture — how a SparkSQL onstage load flows¶
source JDBC ──(watermark-partitioned extract)──▶ local PSV files
/opt/iserver/transient/staging/tmp_<run>_<job>_..._<table>_<n>.psv
│
▼ Spark reads the PSV parts
HDFS Parquet ──▶ hdfs://<namenode>/ingest/<schema>/<Table>
│
▼ register as Hive external table (Spark Thrift Server)
`<thrift.hive2.schema>`.`<table>` USING PARQUET LOCATION 'hdfs://…/ingest/…'
The step type is ONSTAGEGROUP (GeneralGroupLoaderSpark → OnstageRecordSpark). Per table it computes watermark partitions, extracts to PSV, merges the PSV parts into a Parquet directory on HDFS, then calls SparkEngine.updateThriftMeta to (re)create the matching external table in the Thrift catalog. See ENGINE.md for the internals.
2. Create the HDFS/Spark connection¶
In the WPF client → Administration ▸ Configure Maestro Server ▸ General/Cloud Config, create an HDFS-type connection (commonly ELTM_SPARK_DRIVER). Full walk-through with screenshot: UG-ADMIN.md → The Spark / HDFS connection. Parameters:
| Parameter | Purpose |
|---|---|
fs.prefix |
HDFS namenode URI, e.g. hdfs://localhost:9000/. Prepended to every staged Parquet path. |
file.hdfs.site.xml.path / file.core.site.xml.path |
Actual filesystem paths to the Hadoop XMLs on the engine host (e.g. /opt/iserver/tools/hadoop/etc/hadoop/…) — taken literally, not via $MAESTRO_ENGINE_HOME. |
dir.data |
Local staging directory (default /tmp/maestro.data). |
thrift.hive2.db / thrift.hive2.schema / thrift.hive2.connection.name |
Hive (thrift) catalog target — e.g. dwh / iext / HIVE_EMBEDDED. |
fs.s3a.access.key / fs.s3a.secret.key / fs.s3a.endpoint |
S3A credentials + endpoint for Spark to read/write S3 (if staging via S3). |
3. Create the Hive2 (Thrift) JDBC connection¶
The external-table registration runs DDL against the Spark Thrift Server through a JDBC connection whose name is thrift.hive2.connection.name (default HIVE_EMBEDDED). Create it in the JDBC connections list:
| Field | Value |
|---|---|
| Driver class | org.apache.hive.jdbc.HiveDriver |
| Driver jar | hive-jdbc-3.1.3-standalone.jar (shipped via jdbc_list.json; the MPP_Driver_Hive_JDBC.jar alias marks it as an integrator driver) |
| Connection string | jdbc:hive2://<host>:10000/<thrift.hive2.db> (e.g. jdbc:hive2://localhost:10000/dwh) |
| User / Password | Per your HiveServer2 auth (often blank for an embedded/local server) |
Driver deployment: the Hive driver jar is fetched into the image at build time (
fetch-jdbc-drivers.sh), so it goes live only via an image build / redeploy — asqlm.jarhotswap does not add driver jars.
How the engine resolves the connection (worth understanding — it's a two-step lookup):
1. It finds the HDFS connection by type — GeneralConnectionProvider.getSparkConnection() runs select … from t_connection_general where connection_type_name='HDFS' and takes the first row (no name filter), building the SparkEngine from its params.
2. It reads thrift.hive2.connection.name off that HDFS connection (→ HIVE_EMBEDDED, or $HIVE_CONNECTION from system.cfg), then looks that name up as a separate JDBC connection in t_jdbc (getJdbcConnection("HIVE_EMBEDDED")).
So two connections must exist: the HDFS connection (carries the catalog name + fs.prefix / Hadoop config) and a t_jdbc connection literally named HIVE_EMBEDDED. Keep exactly one HDFS connection — getSparkConnection takes the first HDFS row, so multiple would be nondeterministic.
4. Overriding the Thrift catalog per integrator ($HIVE_CONNECTION / $HIVE_DATABASE / $HIVE_SCHEMA)¶
The engine resolves the Thrift catalog target from system.cfg variables first, falling back to the HDFS connection's thrift.hive2.* args when unset (SparkEngine.resolveHiveVar):
system.cfg variable |
Falls back to (connection arg) | Default |
|---|---|---|
$HIVE_CONNECTION |
thrift.hive2.connection.name |
HIVE_EMBEDDED |
$HIVE_DATABASE |
thrift.hive2.db |
dwh |
$HIVE_SCHEMA |
thrift.hive2.schema |
iext |
getVarData passes an undefined $-variable through unchanged, so a value still starting with $ is the sentinel that triggers the fallback. The repo ships all three in sparksql/system.cfg only — it's the sole consumer (updateThriftMeta is gated on jobType == SPARKSQL), so the other integrators don't define them. Change them per environment via the Integrator Config tab (a $HIVE_CONNECTION you point elsewhere must already exist in the JDBC connections list).
5. The Spark Thrift Server¶
integrators/sparksql/thrift.cfg.txt is the reference command to start HiveServer2:
$MAESTRO_ENGINE_HOME/tools/spark/sbin/start-thriftserver.sh \
--conf spark.sql.thriftServer.incrementalCollect=true \
--conf spark.sql.warehouse.dir=hdfs://localhost:9000/dwh \
--deploy-mode client --master $SPARK_MASTER \
--total-executor-cores 4 --executor-memory 16G --driver-memory 16G --name dwh
The warehouse dir (hdfs://…/dwh) and thrift.hive2.db=dwh must agree. The server listens on port 10000 (see CI-CD.md / OPERATIONS.md port maps). External tables registered by the engine appear under the thrift.hive2.schema (e.g. iext) and are queryable through any HiveServer2 client.
On a fresh deploy, docker/init-thrift-schemas.sh creates the standard schemas in the Thrift catalog once the server is up — iext (where the engine registers external tables), ingest, tmp, and default (each seeded with a dummy table). iext is therefore the canonical external schema, and the $HIVE_SCHEMA default (iext) is aligned to it — register-time DDL targets a schema that already exists. The script is idempotent (gated on the ingest database already existing in the metastore), so container restarts don't re-run it. (The metastore persists in PostgreSQL, so a destructive redeploy that wipes pgdata recreates these from scratch.)
6. Integrator config (system.cfg)¶
root-engine-install/db/metadata/integrators/sparksql/system.cfg — the per-platform template. Key variables:
| Variable | Purpose |
|---|---|
$SYSTEM_ALIAS |
Spark SQL Database System. |
$SYSTEM_TOKEN_ENCLOSER |
Identifier quote char — a backtick (`), matching Spark SQL. |
$B64EXPRESSION |
cast(unbase64($COLUMN) as $DATATYPE) — Spark-native base64 decode (no UDF needed, unlike Redshift). |
$HASH_TEMPLATE / $HASH_RETURNS |
SCD hashing — md5(nvl(cast($ARG as String),'')) → CHAR(32). |
$QUERY_* (do not modify) |
Operation templates: TRUNCATE, CDC delete+insert, CTAS, view create, drop, preview, cache. |
$SCHEMALOADERPARTSIZEBYTES |
Parquet part-size target for staging (default 268435456). |
$OBJECT_STORAGE |
S3 staging connection name (S3_CONNECTION) — see MPP-SETUP.md. |
$HIVE_CONNECTION / $HIVE_DATABASE / $HIVE_SCHEMA |
Thrift/Hive catalog target (defaults HIVE_EMBEDDED / dwh / iext) — see §4. |
$SYSTEM_NAME |
SPARKSQL (cosmetic — read by no code). |
7. Empty / no-data loads¶
When a table's watermark window yields no rows (e.g. an incremental load with nothing new, or Total Extract Partitions: 0), the engine writes no Parquet — there is nothing to stage. Rather than fail, it now creates the (empty) target HDFS directory and skips the metastore registration cleanly:
Created empty object-storage directory (no staged data): hdfs://…/ingest/hr/Shift
No staged data at hdfs://…/ingest/hr/Shift — skipped spark.thriftserver registration for ingest_hr_shift
This is normal and the batch still completes. (Previously this surfaced as Warning!! Could not update spark.thriftserver meta reference + an AnalysisException [PATH_NOT_FOUND].) An empty external table isn't registered until the table actually has data, since there's no Parquet schema to infer.
Verify¶
- Thrift Server up —
jdbc:hive2://<host>:10000/dwhconnects; theHIVE_EMBEDDEDJDBC connection's editor test succeeds. - HDFS reachable — the engine host can
hdfs dfs -ls hdfs://<namenode>/(the Hadoop XMLs resolve). - Run an ONSTAGEGROUP job end to end — confirm Parquet lands under
hdfs://…/ingest/<schema>/<Table>and the external table appears iniextvia a HiveServer2 query. - Empty table — a zero-row table logs the §7 info lines and does not fail the batch.
Common failures¶
| Symptom | Cause / fix |
|---|---|
ClassNotFoundException on the Hive connection |
Driver class must be org.apache.hive.jdbc.HiveDriver; ensure hive-jdbc-3.1.3-standalone.jar is deployed (it's in jdbc_list.json). |
Connection not found for: $HIVE_CONNECTION / registration skipped |
$HIVE_CONNECTION resolves to a JDBC connection name that doesn't exist — create it, or rely on the HIVE_EMBEDDED fallback. |
Path does not exist: hdfs://…/ingest/… (older builds) |
Empty-load PATH_NOT_FOUND — fixed; see §7. |
Data frame is empty or not available: …/tmp_…_<table> for a table that did download rows |
The merge failed to read the staged PSV — historically getDataFrameLocalFSOnstagePsvHeader did a redundant double-read (reader.csv(path) then reader.load(path)) that fails under Spark 4, so every partitioned table read as "empty" and nothing staged to HDFS. Fixed (single reader.csv() read); the catch now also logs the real exception, so if it recurs the batch log shows the true cause. The PSV itself is written correctly inside the staging dir by JdbcDataOutputConverter. |
External table missing in iext |
Either no data staged (see above), or the Thrift Server / $HIVE_CONNECTION connection is down. |