Source Connections

This section goes through the Flexter source connection creation procedure.

Input paths for Source Connections are based on the URI standard. It means that Flexter can access different locations and protocols defined by a URI path.

protocol://user:password@hostname:port/path?param=value#param=value

Flexter supports the following protocols in batch mode:

file://
http:// https://
ftp:// ftps://
hdfs:// maprfs://
s3a://
jdbc:xyz://
snowflake://
bigquery://
mongodb://
redshift://

Support of jdbc protocols:

jdbc:bigquery:
jdbc:derby:
jdbc:sqlserver:
jdbc:mariadb:
jdbc:mysql:
jdbc:oracle:
jdbc:postgresql:
jdbc:redshift:
jdbc:snowflake:
jdbc:teradata:

Support of protocols in streaming mode: file:// hdfs:// maprfs://

Some of these protocols require a username and password. In all theses cases it’s possible to define the values for these parameters in a configuration file, cf. section Configuration and settings in this guide. Depending on the protocol used it is also possible to place the user and password directly in the URI path.

-U, --in-user USER              Input user
                          	
-P, --in-password PASSWORD      Input password

Flexter reads XMLSource Data from different Source Connection paths. Source Connections can be separated into three types:

1. XML/JSON documents stored in a file. The documents can optionally be compressed.
2. XML/JSON documents stored as a column in a tabular file, e.g. one of the columns inside 
   a CSV/TSV/PSV/Avro/ORC/Parquet file contains XML/JSON documents
3. XML/JSON documents inside a database table. One of the columns in a database table 
   contains the XML/JSON documents. The Database Tables are read from relational 
   sources using the JDBC protocol or from Apache Hive. In addition to the XML/JSON 
   column you also need to specify the table name.

Useful options:

--out-opt PROP=VALUE,...        Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2

-R, --extra-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                               Extra column(s) to be included in all output tables

-i, --partition-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                               Partition column(s) to be included in all output tables
It’s possible to have multiple partitions and multiple extra columns.

These auto, random, now, id, filename, filepath, =value are kind of flags, to tell where the data inside a column come from. Below a brief explanation:

  1. auto generates values automatically (default option)
  2. random and now are generators they will be used depending on the type.
# example how all flags work
-R col                         The data come from input column 

-R "col auto"                  The data is auto generated because there    is no type specified

-R "col:date auto"             It will generate a date using the now algorithm 

-R "col:integer auto"          It will generate a date using the random algorithm 
         
-R "col:long auto"             It will generate a date using the random algorithm, -R col:long random 
                               to enforce it or -R col:long now to generate 
                               a long with the current time (milliseconds)

-R "col id"                    It will create a column with data collected by parameter --id-column

-R "col filename"              The data is the filename

-R "col filepath"              The data is the filepath

-R "col = value or 'value'"    Set the value directly 

Furthermore you can cast it to the datatype chosen: e.g. -R col:date =‘2019-09-20’.

Usage with different kind of input file:

# example of usage with orc input file 

$ xml2er -l<Logical Schema ID> -F orc -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag data.orc

It accepts repeated parameters: It means -R <col1> -R<col2> will work.

# input
            path:  samples/case/pnr/xml/prn_xml.orc/data.orc
          format:  orc
          column:  source_data
       id-column:  seq_nbr

# output
            path:  /tmp/test
          format:  parquet
     compression:  snappy
        savemode:  overwrite
   extra-columns:  transaction_id
                   transaction_date
                   value_column='FIXED_VALUE'
                   auto_column:long auto
partition-columns: create_date:DATE
                   process_flag
 # example of usage with hive input file 

$ xml2er -x<Schema ID>  -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table>
# example of usage with jdbc as input

$ xml2er -x<Schema ID>  -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table> jdbc:….
 # example with raw files xml/json as input

$ xml2er -x<Schema ID> -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -T <[schema.]table> jdbc:….

When the column data is mentioned without value, or flag (auto, random, now, id, filename, filepath), the value must be found in the input source columns.

XML/JSON documents in files

For reading Source Data from files you need to specify the path to the Source Connection, e.g. an FTP connection or a path to a file server.

$ xml2er|json2er|xsd2er [OPTIONS] INPUTPATH

Below is a list of some useful parameters
--byte-stream           Enforce reading as stream of bytes (2.1GB limit per file) 
                        > Except xsd2er

--archive-read          Read from archive files. zip, tar, jar, cpio, 7z...
                        default: true

--detect-type           Ignores non XML/JSON/XSD files based on detected media type
                        default: true

--filter-content        Filter content, wrong bytes and charset detection
                        default: true
With the byte stream switch you can read files that are bigger than 2.14 GB. It slows down performance but lets you read very large XML or JSON documents with a lower memory footprint. The default approach is reading files as byte arrays.

Detecting a file type with –detect-type is particularly useful when the dataset is a mix of different types of Source Data, e.g. JSON and XML.

Example disabling all filters:

$ xml2er|json2er|xsd2er --archive-read false --filter-content false --detect-type false [OPTIONS] INPUTPATH

XML/JSON documents in tabular files

For tabular files, one of the columns contains XML or JSON documents, e.g. a column inside a Parquet file.

$ xml2er|json2er -F <File Format> -C <XML Column> [OPTIONS] INPUTPATH

Useful options:

-F, --in-format FORMAT    Input format.
                          - jdbc, parquet, orc, json, csv, tsv, psv, avro...

-C, --column NAME|EXPR [AS ALIAS]
                          Name of the column containing XML/JSON data to process

--where WHERE             Where clause for filtering tabular files, jdbc or hive tables
 
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
                          Name of identifier column to contain document's ID values for tabular
                          files, jdbc or hive tables

-<MIN>, --id-min MIN      ID value specifying lower bound data range to be processed
                                
+<MAX>, --id-max MAX      ID value specifying upper bound data range to be processed
# example reading csv files
$ xml2er|json2er -F csv -C xml_or_json_col [OPTIONS] INPUTPATH

The parameter -F or –in-format also accepts the full name if the extra jars are included on the spark-submit library path.

# example reading avro files
$ xml2er|json2er -F avro -C xml_or_json_col [OPTIONS] INPUTPATH

It also accepts filtering of the database on the primary key column.

# example reading orc files filtering by sequential id column between 1 and 100
$ xml2er|json2er -F orc -C xml_or_json_col -I pk_col -1 +100 [OPTIONS] INPUTPATH

XML/JSON inside a database table

Using this option you need to define the table and column in your database, which contains the XML/JSON documents. You have the option to define the database table as a View or even an SQL statement.

$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> [OPTIONS] [INPUTPATH]

# or using a sql query

$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS] [INPUTPATH]

Relevant parameters:

-T, --table TABLE|QUERY   Name of the table (or specified SQL) to return tabular data with XML
                          content in one of the columns
 
-C, --column NAME|EXPR [AS ALIAS]
                          Name of the column containing XML data to process

-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
                          Name of identifier column to contain document's ID values for tabular
                          files, jdbc or hive tables

-<MIN>, --id-min MIN      ID value specifying lower bound data range to be processed
                                
+<MAX>, --id-max MAX      ID value specifying upper bound data range to be processed

--where WHERE             Where clause for filtering tabular files, jdbc or hive tables

--limit LIMIT             Limit clause for filtering the number of records/documents

Two different table based input data sources are supported:

1. JDBC
2. HIVE
3. Spark Connector

JDBC as Source Connection

The JDBC Input data sources require to include the source path with a jdbc: protocol.

$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database url>

# or using a sql query

$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database_url>

Example:

$ xml2er|json2er -T tab -C xml_or_job_col -U rob -P rob_pass [OPTIONS] jdbc:postgresql://host/db

# or using a sql query

$ xml2er|json2er -T "select xml_or_job_col from tab" -C xml_or_json_col -U rob -P rob_pass 
[OPTIONS] jdbc:postgresql://host/db

The above command will use a single thread to load the data from your source database. You can parallelize the load by providing a sequential ID and a range in your source table. As a result more than one connection can be used to load the data.

To read in parallel (available on a single machine as well), a sequential ID column name (I switch) and its range must be provided. The degree of parallelism is determined by the number of vCPUs you have allocated to Flexter, cf. section CPU settings in this guide. Assuming you have given Flexter 4 vCPUs it will create 4 connections to the Source Database and extract the data in parallel from there. In the example below the range is 100 records. Flexter would generate 4 queries that select 25 records each.

$ xml2er|json2er -T tab -C xml_or_json_col -I pk_col -1 +100 …

# or using a sql query

$ xml2er|json2er -T "select pk_col,xml_or_json_col from tab where pk_col between 1 and 100" -C xml_or_json_col -I pk_col -1 +100 …

Providing a unique identifier column is also very useful for logging purposes. If provided, Flexter will write the value for the identifier column to the log in case of failure. This will allow you to easily identify where an issue has occurred.

Hive as Source Connection

Using Hive as a Source Connection is similar to using JDBC. The only difference is that Hive does not require us to specify a Source Connection in the form of an input path (JDBC). Flexter can access the Hive tables directly through Spark.

$ xml2er|json2er -T <[Database_Schema.]Table]> -C <XML_JSON Column> [OPTIONS]

# or using a sql query only for batch mode

$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS]

Streaming mode only supports the table parameter without an actual query. In streaming mode you need to provide the –input-format or -F.

$ xml2er|json2er -Z <duration> -T <[Database_Schema.]Table]> -C <XML_JSON Column> -F <Format> [OPTIONS]

Examples:

# Reading all table in batch mode

$ xml2er -T tab -C xml_col ...

# Reading partially the table in batch mode

$ xml2er -T tab -C xml_col -I pk_col -1 +100 …

# Reading partially the table with query in batch mode

$ xml2er -T "select xml_col from tab where pk_col between 1 and 100" …

Spark Connector as Source Connection

The Apache Spark offers the option to create custom spark connectors, which the vendors usually create by their own.

Flexter has been supporting fewer options, and others can be tried out if they can operate by setting the correct parameters.

The supported ones are:

  • Spark Snowflake Connector: snowflake://
  • Spark BigQuery Connector: bigquery://
  • Spark MongoDB Connector: mongodb://
  • Spark Redshift Connector: redshift://

These connectors aren’t shipped with Flexter, but the Flexter Modules / download.sh tool can help to install them into Spark, or they can be downloaded using the xml2er|json2er|merge2er --pkg PKG parameter, analog to spark-submit --package parameter.

Spark Snowflake Connector

Example using Spark Snowflake connector. It follows the same standard is JDBC jdbc:snowflake://<snowflake_url>

# user/password based
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER -P PASS 'snowflake://<snowflake_url>[?param=value[&param=value]][;param=value[&param=value]]' 

# password-less private key
$ xml2er|json2er|merge2er  … -T TABLE_OR_QUERY -U USER --in-opt private_key_file=pathTo/file.key 'snowflake://<snowflake_url>[?param=value[&param=value]][;param=value[&param=value]]'

# password based private key
$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER --in-opt private_key_file=pathTo/file.key --in-opt private_key_file_pwd=PASS 'snowflake://<snowflake_url>[?param=value[&param=value]][;param=value[&param=value]]'

Important parameters:

-T, --table TABLE|QUERY    Name of the table (or specified SQL) to return tabular data with XML
                           content in one of the columns

--where WHERE              Where clause for filtering Snowflake tables

--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2

Spark BigQuery Connector

Example using Spark BigQuery connector.

$ xml2er|json2er|merge2er … -T TABLE_OR_QUERY -U USER -P PASS 'bigquery://project[:dataset[.table]][?param=value[&param=value]][;param=value[&param=value]]'

Important parameters:

-T, --table TABLE|QUERY    Name of the table (or specified SQL) to return tabular data with XML
                           content in one of the columns

--where WHERE              Where clause for filtering BigQuery tables

--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2

Spark MongoDB Connector

Example using Spark MongoDB connector.

$ xml2er|json2er … -T TABLE_OR_QUERY -U USER -P PASS 'mongodb://HOST[:PORT]/DB[.COLLECTION][?param=value[&param=value]][;param=value[&param=value]]'

Important parameters:

-T, --table TABLE|QUERY    Name of the table (or specified MongoDB Query) to return tabular data with XML
                           content in one of the columns

--where EXPRESSION         MongoDB expressions, ex: "[{$match:{minimum_nights:'2'}}]"

--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2

Custom Spark Connector

Other unsupported connectors can be tried, if they don’t demand code changes, operating entirely by parameters, it may worth try it out.

$ xml2er|json2er|merge2er -F FORMAT_NAME_OR_CLASS --in-opt PROP=VALUE --conf PROP=VALUE

Important parameters:

-F FORMAT_NAME_OR_CLASS    The custom format name or class.   

INPUT_PATH                 It sets the dataFrame reader option path=value

--conf PROP=VALUE          It sets Spark Configuration settings, some connectors require it. 
                           ex: prop1=value1,prop2=value2

--in-opt PROP=VALUE,...    Extra options for Spark dataFrame reader.
                           ex: prop1=value1,prop2=value2