Source Connections
Input paths for Source Connections are based on the URI standard. It means that Flexter can access different locations and protocols defined by a URI path.
protocol://user:password@hostname:port/path?param=value#param=value
Flexter supports the following protocols in batch mode:
file://
http:// https://
ftp:// ftps://
hdfs:// maprfs://
s3a://
jdbc:xyz://
snowflake://
bigquery://
mongodb://
redshift://
Support of jdbc protocols:
jdbc:bigquery:
jdbc:derby:
jdbc:sqlserver:
jdbc:mariadb
jdbc:mysql
jdbc:oracle:
jdbc:postgresql:
jdbc:redshift:
jdbc:snowflake:
jdbc:teradata:
Support of protocols in streaming mode: file:// hdfs:// maprfs://
Some of these protocols require a username and password. In all theses cases it’s possible to define the values for these parameters in a configuration file, cf. section Configuration and settings in this guide. Depending on the protocol used it is also possible to place the user and password directly in the URI path.
-U, --in-user USER Input user
-P, --in-password PASSWORD Input password
Flexter reads XMLSource Data from different Source Connection paths. Source Connections can be separated into three types:
1. XML/JSON documents stored in a file. The XML documents can optionally be compressed.
2. XML/JSON documents stored as a column in a tabular file, e.g. one of the columns inside
a CSV/TSV/Avro file contains XML/JSON documents
3. XML/JSON documents inside a database table. One of the columns in a database table
contains the XML/JSON documents. The Database Tables are read from relational
sources using the JDBC protocol or from Apache Hive. In addition to the XML/JSON
column you also need to specify the table name.
Useful options:
--in-opt PROP=VALUE,... Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
-R, --extra-column <NAME|EXPR>[:DATATYPE] [SOURCE] [AS ALIAS]
Extra column(s) to be included in all output tables.
-i, --partition-column <NAME|EXPR>[:DATATYPE] [SOURCE] [AS ALIAS]
SOURCE: AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE
Partition column(s) to be included in all. output
tables.
These auto, random, now, id, filename, filepath, =value are kind of flags, to tell where the data inside a column come from. Below a brief explanation:
- auto generates values automatically (default option)
- random and now are generators they will be used depending on the type.
# example how all flags work
-R col The data come from input column
-R "col auto" The data is auto generated because there is no type specified
-R "col:date auto" It will generate a date using the now algorithm
-R "col:integer auto" It will generate a date using the random algorithm
-R "col:long auto" It will generate a date using the random algorithm, -R col:long random
to enforce it or -R col:long now to generate
a long with the current time (milliseconds)
-R "col id" It will create a column with data collected by parameter --id-column
-R "col filename" The data is the filename
-R "col filepath" The data is the filepath
-R "col = value or 'value'" Set the value directly
Furthermore you can cast it to the datatype chosen: e.g. -R col:date =‘2019-09-20’.
Usage with different kind of input file:
# example of usage with orc input file
$ xml2er -l<Logical Schema ID> -F orc -C source_data -I seq_nbr -R
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -i "create_date:date" -i process_flag data.orc
It accepts repeated parameters: It means -R <col1> -R<col2> will work.
# input
path: samples/case/pnr/xml/prn_xml.orc/data.orc
format: orc
column: source_data
id-column: seq_nbr
# output
path: /tmp/test
format: parquet
compression: snappy
savemode: overwrite
extra-columns: transaction_id
transaction_date
value_column='FIXED_VALUE'
auto_column:long auto
partition-columns: create_date:DATE
process_flag
# example of usage with hive input file
$ xml2er -l<Logical Schema ID> -C source_data -I seq_nbr -R
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table>
# example of usage with jdbc as input
$ xml2er -l<Logical Schema ID> -C source_data -I seq_nbr -R
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table> jdbc:….
# example with raw files xml/json as input
$ xml2er -l<Logical Schema ID> -R value_column='FIXED_VALUE' -R
"auto_column:long auto" -T <[schema.]table> jdbc:….
When the column data is mentioned without value, or flag (auto, random, now, id, filename, filepath), the value must be found in the input source columns.
XML/JSON documents in files
For reading Source Data from files you need to specify the path to the Source Connection, e.g. an FTP connection or a path to a file server.
$ xml2er|json2er [OPTIONS] INPUTPATH
Below is a list of some useful parameters
--archive-read Read archive files. zip, tar, jar, cpio, 7z...
default: true
--byte-stream Enforce reading as a stream of bytes. Useful
for very large files > 2.1 GB.
--detect-type Ignores non XML/JSON/XSD files
default: true
--filter-content Filter content, wrong bytes and charset detection
default: true
Detecting a file type with –detect-type is particularly useful when the dataset is a mix of different types of Source Data, e.g. JSON and XML.
Example disabling all filters:
$ xml2er|json2er|xsd2er --archive-read false --filter-content false --detect-type false [OPTIONS] INPUTPATH
XML/JSON documents in tabular files
For tabular files, one of the columns contains XML or JSON documents, e.g. a column inside a Parquet file.
$ xml2er|json2er -F <File Format> -C <XML Column> [OPTIONS] INPUTPATH
Useful options:
-F, --in-format FORMAT Input format. parquet, avro, orc, json, csv, tsv
-C, --column <NAME|EXPR> [AS ALIAS]
Column which brings XML/JSON data.
--where WHERE Where clause for filtering tabular files, jdbc
or hive tables.
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
Identifier column for tabular files, jdbc or
hive tables. The EXTRA makes this column
available as extra column too.
-<MIN>, --id-min MIN Identifier column's lower bound
+<MAX>, --id
# example reading csv files
$ xml2er -F csv -C xml_col [OPTIONS] INPUTPATH
The parameter -F or –in-format also accepts the full name if the extra jars are included on the spark-submit library path.
# example reading avro files
$ xml2er -F avro -C xml_col [OPTIONS] INPUTPATH
# or if avro package isn't available
$ xml2er|json2er --pkg com.databricks:spark-avro_2.11:4.0.0 -F avro
INPUTPATH …
It also accepts filtering of the database on the primary key column.
# example reading orc files filtering by sequential id column between 1 and 100
$ xml2er -F orc -C xml_col -I pk_col -1 +100 [OPTIONS] INPUTPATH
XML/JSON inside a database table
Using this option you need to define the table and column in your database, which contains the XML/JSON documents. You have the option to define the database table as a View or even an SQL statement.
$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> [OPTIONS] [INPUTPATH]
# or using a sql query
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS] [INPUTPATH]
Relevant parameters:
-T, --table TABLE|QUERY Name of the table (or specified SQL) to return tabular data with XML
content in one of the columns
-C, --column NAME|EXPR [AS ALIAS]
Name of the column containing XML data to process
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
Name of identifier column to contain document's ID values for tabular
files, jdbc or hive tables
-<MIN>, --id-min MIN ID value specifying lower bound data range to be processed
+<MAX>, --id-max MAX ID value specifying upper bound data range to be processed
--where WHERE Where clause for filtering tabular files, jdbc or hive tables
--limit LIMIT Limit clause for filtering the number of records/documents
Two different table based input data sources are supported:
1. JDBC
2. HIVE
JDBC as a Source Connection
The JDBC Input data sources require to include the source path with a jdbc: protocol.
$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database url>
# or using a sql query
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database_url>
Example:
$ xml2er -T tab -C xml_col -U rob -P rob_pass [OPTIONS] jdbc:postgresql://host/db
# or using a sql query
$ xml2er -T "select xml_col from tab" -C xml_col -U rob -P rob_pass
[OPTIONS] jdbc:postgresql://host/db
The above command will use a single thread to load the data from your source database. You can parallelise the load by providing a sequential ID and a range in your source table. As a result more than one connection can be used to load the data.
To read in parallel (available on a single machine as well), a sequential ID column name (I switch) and its range must be provided. The degree of parallelism is determined by the number of vCPUs you have allocated to Flexter, cf. section CPU settings in this guide. Assuming you have given Flexter 4 vCPUs it will create 4 connections to the Source Database and extract the data in parallel from there. In the example below the range is 100 records. Flexter would generate 4 queries that select 25 records each.
$ xml2er -T tab -C xml_col -I pk_col -1 +100 …
# or using a sql query
$ xml2er -T "select pk_col,xml_col from tab where pk_col between 1 and 100" -C xml_col -I pk_col -1 +100 …
Providing a unique identifier column is also very useful for logging purposes. If provided, Flexter will write the value for the identifier column to the log in case of failure. This will allow you to easily identify where an issue has occurred.
Hive as a Source Connection
Using Hive as a Source Connection is similar to using JDBC. The only difference is that Hive does not require us to specify a Source Connection in the form of an input path (JDBC). Flexter can access the Hive tables directly through Spark.
$ xml2er|json2er -T <[Database_Schema.]Table]> -C <XML_JSON Column> [OPTIONS]
# or using a sql query only for batch mode
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS]
Streaming mode only supports the table parameter without an actual query. In streaming mode you need to provide the –input-format or -F.
$ xml2er|json2er -Z <duration> -T <[Database_Schema.]Table]> -C <XML_JSON Column> -F <Format> [OPTIONS]
Examples:
# Reading all table in batch mode
$ xml2er -T tab -C xml_col ...
# Reading partially the table in batch mode
$ xml2er -T tab -C xml_col -I pk_col -1 +100 …
# Reading partially the table with query in batch mode
$ xml2er -T "select xml_col from tab where pk_col between 1 and 100" …
# Reading new data from a table in streaming mode
$ xml2er -Z 10s -T tab -C xml_col -F orc ...