Source Connections
Input paths for Source Connections are based on the URI standard. It means that Flexter can access different locations and protocols defined by a URI path.
protocol://user:password@hostname:port/path?param=value#param=value
In some protocols, i’s accepted glob type wildcards.
Examples:
file://pathTo/**/*.xml
hdfs://pathTo/2020-??/*.xml
Supported protocols
Flexter supports the following protocols in batch mode:
- Local Filesystem- file://# file example file://pathTo/fileOrDirectory # file format file://[host[:port]][/absolute/path|/drive:/path|/share/path|/relative/path][?query][#fragment]
 
- file://
- Web protocols- http://# http example http://host:port/pathTo/file # http format http://[user[:password]@]host[:port][/path][?query][#fragment]
- https://# https example https://host:port/pathTo/file # https format https://[user[:password]@]host[:port][/path][?query][#fragment]
- ftp://# ftp example ftp://host:port/pathTo/file # ftp format ftp://[user[:password]@]host[:port][/path/file][;type={A|I|D}][?query][#fragment]
- ftps://# ftps example ftps://host:port/pathTo/file # ftps format ftps://[user[:password]@]host[:port][/path/file][;type={A|I|D}][?query][#fragment]
 
- http://
- Hadoop Filesystems- hdfs://# hdfs example hdfs://host:port/pathTo/fileOrDirectory # hdfs format hdfs://[user@]host[:port]/path/file
- webhdfs://# webhdfs example webhdfs://host:port/pathTo/fileOrDirectory # webhdfs format webhdfs://[user@]host[:port]/path/file
 
- hdfs://
- AWS filesystem- s3a://# s3a example s3a://bucket/pathTo/fileOrDirectory # s3a format s3a://[accessKey[:secretKey]@][bucket[/path/object]][?endpoint=host[:port]®ion=REGION&otherOptions][#fragment]
 
- s3a://
- Azure Filesystems- abfs://# abfs example abfs://[email protected]/pathTo/fileOrDirectory # abfs format abfs://[container[@]account[:domain][:port]][/path/blob][?query][#fragment]
- abfss://# abfss example abfss://[email protected]/pathTo/fileOrDirectory # abfss format abfss://[container[@]account[:domain][:port]][/path/blob][?query][#fragment]
- adl://# adl example adl://account.azuredatalakestore.net/pathTo/fileOrDirectory # adl format adl://[filesystem.azuredatalakestore.net[:port]][/path/file][?query][#fragment]
 
- abfs://
- Google Cloud Filesystems- gs://# gs example gs://bucket/pathTo/fileOrDirectory # gs format gs://[bucket]/[path/to/object][?query][#fragment]
 
- gs://
- Databricks Filesystem- dbfs://# dbfs example dbfs:/pathTo/fileOrDirectory # dbfs format dbfs://[profile@][/]/path/file[?query][#fragment]
 
- dbfs://
- JDBC Protocol- # jdbc example jdbc:dialect://host:port/db # jdbc format jdbc:<dialect>://[user[:password]@][host[:port]][/db][?prop1=value1[&prop2=value2]][#fragment]
- JDBC Dialects - jdbc:mysql://# jdbc:mysql example jdbc:mysql://host:port/db # jdbc:mysql format jdbc:mysql://[user[:password]@]host[:port]/[db][?prop1=value1[&prop2=value2]][#fragment]
- jdbc:mariadb://# jdbc:mariadb example jdbc:mariadb://host:port/db # jdbc:mariadb format jdbc:mariadb://[user[:password]@]host[:port][,host2[:port2],...][/db][?prop1=value1[&prop2=value2]][#fragment]
- jdbc:postgresql://# jdbc:postgresql example jdbc:postgresql://host:port/db # jdbc:postgresql format jdbc:postgresql://[user[:password]@]host[:port][,host2[:port2],...][/db][?prop1=value1[&prop2=value2]][#fragment]
- jdbc:oracle:thin://# jdbc:oracle:thin example jdbc:oracle:thin://host:port/db # jdbc:oracle:thin format jdbc:oracle:thin://[user[:password]@]host[:port][/service][?prop1=value1[&prop2=value2]][#fragment]
- jdbc:sqlserver://# jdbc:sqlserver example jdbc:sqlserver://host:port/db # jdbc:sqlserver format jdbc:sqlserver://[host[\instance][:port]][;prop1=value1[;prop2=value2]][#fragment]
- jdbc:db2://# jdbc:db2 example jdbc:db2://host:port/db # jdbc:db2 format jdbc:db2://[host[:port]/][db][;prop1=value1[;prop2=value2]][#fragment]
- jdbc:teradata://# jdbc:teradata example jdbc:teradata://host:port/db # jdbc:teradata format jdbc:teradata://[host[:port]][/db][?prop1=value1[&prop2=value2]][#fragment]
- jdbc:h2://# jdbc:h2 example jdbc:h2://host:port/db # jdbc:h2 format jdbc:h2://[host[:port]/][path/db][;prop1=value1[;prop2=value2]][#fragment]
- jdbc:hsqldb://# jdbc:hsqldb example jdbc:hsqldb://host:port/db # jdbc:hsqldb format jdbc:hsqldb://[host[:port]/][db][;prop1=value1[;prop2=value2]][#fragment]
- jdbc:snowflake://# jdbc:snowflake example jdbc:snowflake://host:port/db # jdbc:snowflake format jdbc:snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?user=&password=&db=&schema=&warehouse=&role=&prop1=val1
- jdbc:bigquery://# jdbc:bigquery example jdbc:bigquery://host:port/db # jdbc:bigquery format jdbc:bigquery://[project[:location]@][host[:port]][/dataset][?prop1=value1[&prop2=value2]][#fragment]
- jdbc:mongodb://# jdbc:mongodb example jdbc:mongodb://host:port/db # jdbc:mongodb format jdbc:mongodb://[user[:password]@]host1[:port1][,host2[:port2],...][/db][?prop1=value1[&prop2=value2]][#fragment]
- jdbc:redshift://# jdbc:redshift example jdbc:redshift://host:port/db # jdbc:redshift format jdbc:redshift://[user[:password]@]host[:port][/db][?prop1=value1[&prop2=value2]][#fragment]
 
- jdbc:mysql://
- JDBC Hive Related Dialects - jdbc:hive://
- jdbc:hive2://
- jdbc:impala://
- jdbc:spark://
 
 
- Spark Connectors- snowflake://# snowflake example snowflake://host:port?db=db&warehouse=warehouse # snowflake format [user[:password]@]snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?user=&password=&db=&schema=&warehouse=&role=&prop1=val1
- bigquery://# bigquery example bigquery://project:dataset # bigquery format bigquery://[project:]dataset[.table][?prop1=value1[&prop2=value2]][#fragment]
- mongodb://# mongodb example mongodb://host:port/db.collection # mongodb format mongodb://[user[:password]@]host[:port][/database[.collection]][?prop1=value1[&prop2=value2]][#fragment]
- redshift://# redshift example redshift://host:port/db # redshift format redshift://[user[:password]@]host[:port][/database][?prop1=value1[&prop2=value2]][#fragment]
- othersAnd other Spark Connectors defined mainly by parameters instead of protocol://
 
- snowflake://
Support of protocols in streaming mode:
- file://
- hdfs://
Source Connectors parameters
Some of these protocols require a username and password. In all these cases it’s possible to define the values for these parameters in a configuration file, cf. section Configuration and settings in this guide. Depending on the protocol used it is also possible to place the user and password directly in the URI path.
-U, --in-user USER              Input user
                          	
-P, --in-password PASSWORD      Input password
Flexter reads XMLSource Data from different Source Connection paths. Source Connections can be separated into three types:
1. XML/JSON documents stored in a file. The documents can optionally be compressed.
2. XML/JSON documents stored as a column in a tabular file, e.g. one of the columns inside 
   a CSV/TSV/PSV/Avro/ORC/Parquet file contains XML/JSON documents
3. XML/JSON documents inside a database table. One of the columns in a database table 
   contains the XML/JSON documents. The Database Tables are read from relational 
   sources using the JDBC protocol or from Apache Hive. In addition to the XML/JSON 
   column you also need to specify the table name.
Useful options:
--out-opt PROP=VALUE,...        Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2
-R, --extra-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                               Extra column(s) to be included in all output tables
-i, --partition-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                               Partition column(s) to be included in all output tables
These auto, random, now, id, filename, filepath, =value are kind of flags, to tell where the data inside a column come from. Below a brief explanation:
- auto generates values automatically (default option)
- random and now are generators they will be used depending on the type.
# example how all flags work
-R col                         The data come from input column 
-R "col auto"                  The data is auto generated because there    is no type specified
-R "col:date auto"             It will generate a date using the now algorithm 
-R "col:integer auto"          It will generate a date using the random algorithm 
         
-R "col:long auto"             It will generate a date using the random algorithm, -R col:long random 
                               to enforce it or -R col:long now to generate 
                               a long with the current time (milliseconds)
-R "col id"                    It will create a column with data collected by parameter --id-column
-R "col filename"              The data is the filename
-R "col filepath"              The data is the filepath
-R "col = value or 'value'"    Set the value directly 
Furthermore, you can cast it to the datatype chosen: e.g. -R col:date =‘2019-09-20’.
Usage with different kinds of an input file:
# example of usage with an ORC input file 
$ xml2er -l<Logical Schema ID> -F orc -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag data.orc
It accepts repeated parameters: It means -R <col1> -R<col2> will work.
# input
            path:  samples/case/pnr/xml/prn_xml.orc/data.orc
          format:  orc
          column:  source_data
       id-column:  seq_nbr
# output
            path:  /tmp/test
          format:  parquet
     compression:  snappy
        savemode:  overwrite
   extra-columns:  transaction_id
                   transaction_date
                   value_column='FIXED_VALUE'
                   auto_column:long auto
partition-columns: create_date:DATE
                   process_flag
 # example of usage with HIVE input file 
$ xml2er -x<Schema ID>  -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table>
# example of usage with JDBC as input
$ xml2er -x<Schema ID>  -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table> jdbc:….
 # example with raw files XML/JSON as input
$ xml2er -x<Schema ID> -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -T <[schema.]table> jdbc:….
When the column data is mentioned without value, or flag (auto, random, now, id, filename, filepath), the value must be found in the input source columns.
Source Command Examples
Documents from Files
For reading Source Data from files, you need to specify the path to the Source Connection, e.g., an FTP connection or a path to a file server.
$ xml2er|json2er|xsd2er [OPTIONS] INPUTPATH
Below is a list of some useful parameters
--byte-stream           Enforce reading as stream of bytes (2.1GB limit per file) 
                        > Except xsd2er
--archive-read          Read from archive files. zip, tar, jar, cpio, 7z...
                        default: true
--detect-type           Ignores non XML/JSON/XSD files based on detected media type
                        default: true
--filter-content        Filter content, wrong bytes and charset detection
                        default: true
Detecting a file type with –detect-type is particularly useful when the dataset is a mix of different types of Source Data, e.g., JSON and XML.
Example disabling all filters:
$ xml2er|json2er|xsd2er --archive-read false --filter-content false --detect-type false [OPTIONS] INPUTPATH
Documents from Tabular Files
For tabular files, one of the columns contains XML or JSON documents, e.g. a column inside a Parquet file.
$ xml2er|json2er -F <File Format> -C <XML Column> [OPTIONS] INPUTPATH
Useful options:
-F, --in-format FORMAT    Input format.
                          - jdbc, parquet, orc, json, csv, tsv, psv, avro...
-C, --column NAME|EXPR [AS ALIAS]
                          Name of the column containing XML/JSON data to process
--where WHERE             Where clause for filtering tabular files, jdbc or hive tables
 
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
                          Name of identifier column to contain document's ID values for tabular
                          files, jdbc or hive tables
-<MIN>, --id-min MIN      ID value specifying lower bound data range to be processed
                                
+<MAX>, --id-max MAX      ID value specifying upper bound data range to be processed
# example reading csv files
$ xml2er|json2er -F csv -C xml_or_json_col [OPTIONS] INPUTPATH
The parameter -F or –in-format also accepts the full name if the extra jars are included on the spark-submit library path.
# example reading avro files
$ xml2er|json2er -F avro -C xml_or_json_col [OPTIONS] INPUTPATH
It also accepts filtering of the database on the primary key column.
# example reading orc files filtering by sequential id column between 1 and 100
$ xml2er|json2er -F orc -C xml_or_json_col -I pk_col -1 +100 [OPTIONS] INPUTPATH
Documents from Database Tables
Using this option, you need to define the table and column in your database, which contains the XML/JSON documents. You can define the database table as a View or even an SQL statement.
$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> [OPTIONS] [INPUTPATH]
# or using a sql query
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS] [INPUTPATH]
Relevant parameters:
-T, --table TABLE|QUERY   Name of the table (or specified SQL) to return tabular data with XML
                          content in one of the columns
 
-C, --column NAME|EXPR [AS ALIAS]
                          Name of the column containing XML data to process
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
                          Name of identifier column to contain document's ID values for tabular
                          files, jdbc or hive tables
-<MIN>, --id-min MIN      ID value specifying lower bound data range to be processed
                                
+<MAX>, --id-max MAX      ID value specifying upper bound data range to be processed
--where WHERE             Where clause for filtering tabular files, jdbc or hive tables
--limit LIMIT             Limit clause for filtering the number of records/documents
Two different table-based input data sources are supported:
1. JDBC
2. HIVE
3. Spark Connector
Documents from JDBC Tables
The JDBC Input data sources require including the source path with a jdbc: protocol.
$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database url>
# or using a sql query
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database_url>
Example:
$ xml2er|json2er -T tab -C xml_or_job_col -U rob -P rob_pass [OPTIONS] jdbc:postgresql://host/db
# or using a sql query
$ xml2er|json2er -T "select xml_or_job_col from tab" -C xml_or_json_col -U rob -P rob_pass 
[OPTIONS] jdbc:postgresql://host/db
The above command will use a single thread to load the data from your source database. You can parallelize the load by providing a sequential ID and a range in your source table. As a result, more than one connection can be used to load the data.
To read in parallel (available on a single machine as well), a sequential ID column name (I switch) and its range must be provided. The degree of parallelism is determined by the number of vCPUs you have allocated to Flexter, cf. section CPU settings in this guide. Assuming you have given Flexter 4 vCPUs, it will create four connections to the Source Database and extract the data in parallel from there. In the example below, the range is 100 records. Flexter would generate 4 queries that select 25 records each.
$ xml2er|json2er -T tab -C xml_or_json_col -I pk_col -1 +100 …
# or using a sql query
$ xml2er|json2er -T "select pk_col,xml_or_json_col from tab where pk_col between 1 and 100" -C xml_or_json_col -I pk_col -1 +100 …
Providing a unique identifier column is also very useful for logging purposes. If provided, Flexter will write the value for the identifier column to the log in case of failure. This will allow you to easily identify where an issue has occurred.
Documents from HIVE Tables
Using Hive as a Source Connection is similar to using JDBC. The only difference is that Hive does not require us to specify a Source Connection in the form of an input path (JDBC). Flexter can access the Hive tables directly through Spark.
$ xml2er|json2er -T <[Database_Schema.]Table]> -C <XML_JSON Column> [OPTIONS]
# or using a sql query only for batch mode
$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS]
Streaming mode only supports the table parameter without an actual query. In streaming mode you need to provide the –input-format or -F.
$ xml2er|json2er -Z <duration> -T <[Database_Schema.]Table]> -C <XML_JSON Column> -F <Format> [OPTIONS]
Examples:
# Reading all tables in batch mode
$ xml2er -T tab -C xml_col ...
# Reading partially the table in batch mode
$ xml2er -T tab -C xml_col -I pk_col -1 +100 …
# Reading partially the table with query in batch mode
$ xml2er -T "select xml_col from tab where pk_col between 1 and 100" …
Documents from Spark Connector Tables
The Apache Spark offers the option to create custom spark connectors, which the vendors usually create on their own.
Flexter has been supporting fewer options, and others can be tried out if they can operate by setting the correct parameters.
The supported ones are:
- Spark Snowflake Connector: snowflake://
- Spark BigQuery Connector: bigquery://
- Spark MongoDB Connector: mongodb://
- Spark Redshift Connector: redshift://
These connectors aren’t shipped with Flexter, but the Flexter Modules / download.sh tool can help to install them into Spark,
or they can be downloaded using the xml2er|json2er|merge2er --pkg PKG parameter, analog to spark-submit --package parameter.
Snowflake Spark Connector
Example using Spark Snowflake connector. It follows the same standard is JDBC jdbc:snowflake://<snowflake_url>
# user/password based
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER -P PASS 'snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?db=&schema=&warehouse=&role=&prop1=val1' 
# password-less private key
$ xml2er|json2er|merge2er  … -T TABLE_OR_QUERY -U USER --in-opt private_key_file=pathTo/file.key 'snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?db=&schema=&warehouse=&role=&prop1=val1'
# password based private key
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER --in-opt private_key_file=pathTo/file.key --in-opt private_key_file_pwd=PASS 'snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?db=&schema=&warehouse=&role=&prop1=val1'
Important parameters:
-T, --table TABLE|QUERY    Name of the table (or specified SQL) to return tabular data with XML
                           content in one of the columns
--where WHERE              Where clause for filtering Snowflake tables
--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
BigQuery Spark Connector
Example using Spark BigQuery connector.
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER -P PASS 'bigquery://[project:]dataset[.table][?prop1=value1[&prop2=value2]][#fragment]'
Important parameters:
-T, --table TABLE|QUERY    Name of the table (or specified SQL) to return tabular data with XML
                           content in one of the columns
--where WHERE              Where clause for filtering BigQuery tables
--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
MongoDB Spark Connector
Example using Spark MongoDB connector.
$ xml2er|json2er … -T TABLE_OR_QUERY -U USER -P PASS 'mongodb://[user[:password]@]host[:port][/database[.collection]][?prop1=value1[&prop2=value2]][#fragment]'
Important parameters:
-T, --table TABLE|QUERY    Name of the table (or specified MongoDB Query) to return tabular data with XML
                           content in one of the columns
--where EXPRESSION         MongoDB expressions, ex: "[{$match:{minimum_nights:'2'}}]"
--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
Custom Spark Connector
Other unsupported connectors can be tried, if they don’t demand code changes, operating entirely by parameters, it may be worth trying it out.
$ xml2er|json2er|merge2er -F FORMAT_NAME_OR_CLASS --in-opt PROP=VALUE --conf PROP=VALUE
Important parameters:
-F FORMAT_NAME_OR_CLASS    The custom format name or class.   
INPUT_PATH                 It sets the dataFrame reader option path=value
--conf PROP=VALUE          It sets Spark Configuration settings, some connectors require it. 
                           ex: prop1=value1,prop2=value2
--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader.
                           ex: prop1=value1,prop2=value2