Source Connections
Input paths for Source Connections are based on the URI standard. It means that Flexter can access different locations and protocols defined by a URI path.
protocol://user:password@hostname:port/path?param=value#param=value
Flexter supports the following protocols in batch mode:
file://
http:// https://
ftp:// ftps://
hdfs:// maprfs://
s3a://
jdbc:xyz://
snowflake://
bigquery://
mongodb://
redshift://
Support of jdbc protocols:
jdbc:bigquery:
jdbc:derby:
jdbc:sqlserver:
jdbc:mariadb:
jdbc:mysql:
jdbc:oracle:
jdbc:postgresql:
jdbc:redshift:
jdbc:snowflake:
jdbc:teradata:
Support of protocols in streaming mode: file:// hdfs:// maprfs://
Some of these protocols require a username and password. In all theses cases it’s possible to define the values for these parameters in a configuration file, cf. section Configuration and settings in this guide. Depending on the protocol used it is also possible to place the user and password directly in the URI path.
-U, --in-user USER Input user
-P, --in-password PASSWORD Input password
Flexter reads XMLSource Data from different Source Connection paths. Source Connections can be separated into three types:
1. XML/JSON documents stored in a file. The documents can optionally be compressed.
2. XML/JSON documents stored as a column in a tabular file, e.g. one of the columns inside
a CSV/TSV/PSV/Avro/ORC/Parquet file contains XML/JSON documents
3. XML/JSON documents inside a database table. One of the columns in a database table
contains the XML/JSON documents. The Database Tables are read from relational
sources using the JDBC protocol or from Apache Hive. In addition to the XML/JSON
column you also need to specify the table name.
Useful options:
--out-opt PROP=VALUE,... Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2
-R, --extra-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
Extra column(s) to be included in all output tables
-i, --partition-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
Partition column(s) to be included in all output tables
These auto, random, now, id, filename, filepath, =value are kind of flags, to tell where the data inside a column come from. Below a brief explanation:
- auto generates values automatically (default option)
- random and now are generators they will be used depending on the type.
# example how all flags work
-R col The data come from input column
-R "col auto" The data is auto generated because there is no type specified
-R "col:date auto" It will generate a date using the now algorithm
-R "col:integer auto" It will generate a date using the random algorithm
-R "col:long auto" It will generate a date using the random algorithm, -R col:long random
to enforce it or -R col:long now to generate
a long with the current time (milliseconds)
-R "col id" It will create a column with data collected by parameter --id-column
-R "col filename" The data is the filename
-R "col filepath" The data is the filepath
-R "col = value or 'value'" Set the value directly
Furthermore you can cast it to the datatype chosen: e.g. -R col:date =‘2019-09-20’.
Usage with different kind of input file:
# example of usage with orc input file
$ xml2er -l<Logical Schema ID> -F orc -C source_data -I seq_nbr -R
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -i "create_date:date" -i process_flag data.orc
It accepts repeated parameters: It means -R <col1> -R<col2> will work.
# input
path: samples/case/pnr/xml/prn_xml.orc/data.orc
format: orc
column: source_data
id-column: seq_nbr
# output
path: /tmp/test
format: parquet
compression: snappy
savemode: overwrite
extra-columns: transaction_id
transaction_date
value_column='FIXED_VALUE'
auto_column:long auto
partition-columns: create_date:DATE
process_flag
# example of usage with hive input file
$ xml2er -x<Schema ID> -C source_data -I seq_nbr -R
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table>
# example of usage with jdbc as input
$ xml2er -x<Schema ID> -C source_data -I seq_nbr -R
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table> jdbc:….
# example with raw files xml/json as input
$ xml2er -x<Schema ID> -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -T <[schema.]table> jdbc:….
When the column data is mentioned without value, or flag (auto, random, now, id, filename, filepath), the value must be found in the input source columns.
XML/JSON documents in files
For reading Source Data from files you need to specify the path to the Source Connection, e.g. an FTP connection or a path to a file server.
$ xml2er|json2er|xsd2er [OPTIONS] INPUTPATH
Below is a list of some useful parameters
--byte-stream Enforce reading as stream of bytes (2.1GB limit per file)
> Except xsd2er
--archive-read Read from archive files. zip, tar, jar, cpio, 7z...
default: true
--detect-type Ignores non XML/JSON/XSD files based on detected media type
default: true
--filter-content Filter content, wrong bytes and charset detection
default: true
Detecting a file type with –detect-type is particularly useful when the dataset is a mix of different types of Source Data, e.g. JSON and XML.
Example disabling all filters:
$ xml2er|json2er|xsd2er --archive-read false --filter-content false --detect-type false [OPTIONS] INPUTPATH
XML/JSON documents in tabular files
For tabular files, one of the columns contains XML or JSON documents, e.g. a column inside a Parquet file.
$ xml2er|json2er -F <File Format> -C <XML Column> [OPTIONS] INPUTPATH
Useful options:
-F, --in-format FORMAT Input format.
- jdbc, parquet, orc, json, csv, tsv, psv, avro...
-C, --column NAME|EXPR [AS ALIAS]
Name of the column containing XML/JSON data to process
--where WHERE Where clause for filtering tabular files, jdbc or hive tables
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
Name of identifier column to contain document's ID values for tabular
files, jdbc or hive tables
-<MIN>, --id-min MIN ID value specifying lower bound data range to be processed
+<MAX>, --id-max MAX ID value specifying upper bound data range to be processed
# example reading csv files
$ xml2er|json2er -F csv -C xml_or_json_col [OPTIONS] INPUTPATH
The parameter -F or –in-format also accepts the full name if the extra jars are included on the spark-submit library path.
# example reading avro files
$ xml2er|json2er -F avro -C xml_or_json_col [OPTIONS] INPUTPATH
It also accepts filtering of the database on the primary key column.
# example reading orc files filtering by sequential id column between 1 and 100
$ xml2er|json2er -F orc -C xml_or_json_col -I pk_col -1 +100 [OPTIONS] INPUTPATH
XML/JSON inside a database table
Using this option you need to define the table and column in your database, which contains the XML/JSON documents. You have the option to define the database table as a View or even an SQL statement.
$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> [OPTIONS] [INPUTPATH]
# or using a sql query
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS] [INPUTPATH]
Relevant parameters:
-T, --table TABLE|QUERY Name of the table (or specified SQL) to return tabular data with XML
content in one of the columns
-C, --column NAME|EXPR [AS ALIAS]
Name of the column containing XML data to process
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
Name of identifier column to contain document's ID values for tabular
files, jdbc or hive tables
-<MIN>, --id-min MIN ID value specifying lower bound data range to be processed
+<MAX>, --id-max MAX ID value specifying upper bound data range to be processed
--where WHERE Where clause for filtering tabular files, jdbc or hive tables
--limit LIMIT Limit clause for filtering the number of records/documents
Two different table based input data sources are supported:
1. JDBC
2. HIVE
3. Spark Connector
JDBC as Source Connection
The JDBC Input data sources require to include the source path with a jdbc: protocol.
$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database url>
# or using a sql query
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database_url>
Example:
$ xml2er|json2er -T tab -C xml_or_job_col -U rob -P rob_pass [OPTIONS] jdbc:postgresql://host/db
# or using a sql query
$ xml2er|json2er -T "select xml_or_job_col from tab" -C xml_or_json_col -U rob -P rob_pass
[OPTIONS] jdbc:postgresql://host/db
The above command will use a single thread to load the data from your source database. You can parallelize the load by providing a sequential ID and a range in your source table. As a result more than one connection can be used to load the data.
To read in parallel (available on a single machine as well), a sequential ID column name (I switch) and its range must be provided. The degree of parallelism is determined by the number of vCPUs you have allocated to Flexter, cf. section CPU settings in this guide. Assuming you have given Flexter 4 vCPUs it will create 4 connections to the Source Database and extract the data in parallel from there. In the example below the range is 100 records. Flexter would generate 4 queries that select 25 records each.
$ xml2er|json2er -T tab -C xml_or_json_col -I pk_col -1 +100 …
# or using a sql query
$ xml2er|json2er -T "select pk_col,xml_or_json_col from tab where pk_col between 1 and 100" -C xml_or_json_col -I pk_col -1 +100 …
Providing a unique identifier column is also very useful for logging purposes. If provided, Flexter will write the value for the identifier column to the log in case of failure. This will allow you to easily identify where an issue has occurred.
Hive as Source Connection
Using Hive as a Source Connection is similar to using JDBC. The only difference is that Hive does not require us to specify a Source Connection in the form of an input path (JDBC). Flexter can access the Hive tables directly through Spark.
$ xml2er|json2er -T <[Database_Schema.]Table]> -C <XML_JSON Column> [OPTIONS]
# or using a sql query only for batch mode
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS]
Streaming mode only supports the table parameter without an actual query. In streaming mode you need to provide the –input-format or -F.
$ xml2er|json2er -Z <duration> -T <[Database_Schema.]Table]> -C <XML_JSON Column> -F <Format> [OPTIONS]
Examples:
# Reading all table in batch mode
$ xml2er -T tab -C xml_col ...
# Reading partially the table in batch mode
$ xml2er -T tab -C xml_col -I pk_col -1 +100 …
# Reading partially the table with query in batch mode
$ xml2er -T "select xml_col from tab where pk_col between 1 and 100" …
Spark Connector as Source Connection
The Apache Spark offers the option to create custom spark connectors, which the vendors usually create by their own.
Flexter has been supporting fewer options, and others can be tried out if they can operate by setting the correct parameters.
The supported ones are:
- Spark Snowflake Connector: snowflake://
- Spark BigQuery Connector: bigquery://
- Spark MongoDB Connector: mongodb://
- Spark Redshift Connector: redshift://
These connectors aren’t shipped with Flexter, but the Flexter Modules / download.sh tool can help to install them into Spark,
or they can be downloaded using the xml2er|json2er|merge2er --pkg PKG
parameter, analog to spark-submit --package
parameter.
Spark Snowflake Connector
Example using Spark Snowflake connector. It follows the same standard is JDBC jdbc:snowflake://<snowflake_url>
# user/password based
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER -P PASS 'snowflake://<snowflake_url>[?param=value[¶m=value]][;param=value[¶m=value]]'
# password-less private key
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER --in-opt private_key_file=pathTo/file.key 'snowflake://<snowflake_url>[?param=value[¶m=value]][;param=value[¶m=value]]'
# password based private key
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER --in-opt private_key_file=pathTo/file.key --in-opt private_key_file_pwd=PASS 'snowflake://<snowflake_url>[?param=value[¶m=value]][;param=value[¶m=value]]'
Important parameters:
-T, --table TABLE|QUERY Name of the table (or specified SQL) to return tabular data with XML
content in one of the columns
--where WHERE Where clause for filtering Snowflake tables
--in-opt PROP=VALUE,... Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
Spark BigQuery Connector
Example using Spark BigQuery connector.
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER -P PASS 'bigquery://project[:dataset[.table]][?param=value[¶m=value]][;param=value[¶m=value]]'
Important parameters:
-T, --table TABLE|QUERY Name of the table (or specified SQL) to return tabular data with XML
content in one of the columns
--where WHERE Where clause for filtering BigQuery tables
--in-opt PROP=VALUE,... Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
Spark MongoDB Connector
Example using Spark MongoDB connector.
$ xml2er|json2er … -T TABLE_OR_QUERY -U USER -P PASS 'mongodb://HOST[:PORT]/DB[.COLLECTION][?param=value[¶m=value]][;param=value[¶m=value]]'
Important parameters:
-T, --table TABLE|QUERY Name of the table (or specified MongoDB Query) to return tabular data with XML
content in one of the columns
--where EXPRESSION MongoDB expressions, ex: "[{$match:{minimum_nights:'2'}}]"
--in-opt PROP=VALUE,... Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
Custom Spark Connector
Other unsupported connectors can be tried, if they don’t demand code changes, operating entirely by parameters, it may worth try it out.
$ xml2er|json2er|merge2er -F FORMAT_NAME_OR_CLASS --in-opt PROP=VALUE --conf PROP=VALUE
Important parameters:
-F FORMAT_NAME_OR_CLASS The custom format name or class.
INPUT_PATH It sets the dataFrame reader option path=value
--conf PROP=VALUE It sets Spark Configuration settings, some connectors require it.
ex: prop1=value1,prop2=value2
--in-opt PROP=VALUE,... Extra options for Spark dataFrame reader.
ex: prop1=value1,prop2=value2