Source Connections

This section goes through the Flexter source connection creation procedure.

Input paths for Source Connections are based on the URI standard. It means that Flexter can access different locations and protocols defined by a URI path.

protocol://user:password@hostname:port/path?param=value#param=value

Flexter supports the following protocols in batch mode:

file://
http:// https://
ftp:// ftps://
hdfs:// maprfs://
s3a://
jdbc:xyz://
snowflake://
bigquery://
mongodb://
redshift://

Support of protocols in streaming mode: file:// hdfs:// maprfs://

Some of these protocols require a username and password. In all theses cases it’s possible to define the values for these parameters in a configuration file, cf. section Configuration and settings in this guide. Depending on the protocol used it is also possible to place the user and password directly in the URI path.

-U, --iuser USER            	Input user (Source Connection)
                          	
-P, --ipassword PASSWORD    	Input password (Source Connection)

Flexter reads XMLSource Data from different Source Connection paths. Source Connections can be separated into three types:

1. XML/JSON documents stored in a file. The XML documents can optionally be compressed.
2. XML/JSON documents stored as a column in a tabular file, e.g. one of the columns inside 
   a CSV/TSV/Avro file contains XML/JSON documents
3. XML/JSON documents inside a database table. One of the columns in a database table 
   contains the XML/JSON documents. The Database Tables are read from relational 
   sources using the JDBC protocol or from Apache Hive. In addition to the XML/JSON 
   column you also need to specify the table name.

Useful options:

--in-opt PROP=VALUE,...     Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2

-R, --extra-column <NAME|EXPR>[:DATATYPE] [SOURCE] [AS ALIAS]
                        Extra column(s) to be included in all output tables.

-i, --partition-column <NAME|EXPR>[:DATATYPE] [SOURCE] [AS ALIAS]
            SOURCE: AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE
                        Partition column(s) to be included in all. output      
                        tables.
It’s possible to have multiple partitions and multiple extra columns.

These auto, random, now, id, filename, filepath, =value are kind of flags, to tell where the data inside a column come from. Below a brief explanation:

  1. auto generates values automatically (default option)
  2. random and now are generators they will be used depending on the type.
# example how all flags work
-R col                         The data come from input column 

-R "col auto"                  The data is auto generated because there    is no type specified

-R "col:date auto"             It will generate a date using the now algorithm 

-R "col:integer auto"          It will generate a date using the random algorithm 
         
-R "col:long auto"             It will generate a date using the random algorithm, -R col:long random 
                               to enforce it or -R col:long now to generate 
                               a long with the current time (milliseconds)

-R "col id"                    It will create a column with data collected by parameter --id-column

-R "col filename"              The data is the filename

-R "col filepath"              The data is the filepath

-R "col = value or 'value'"    Set the value directly 

Furthermore you can cast it to the datatype chosen: e.g. -R col:date =‘19-09-20’.

Usage with different kind of input file:

# example of usage with orc input file 

$ xml2er -l<Logical Schema ID> -F orc -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag data.orc
It accepts repeated parameters: It means -R <col1> -R<col2> will work.

# input
            path:  samples/case/pnr/xml/prn_xml.orc/data.orc
          format:  orc
          column:  source_data
       id-column:  seq_nbr

# output
            path:  /tmp/test
          format:  parquet
     compression:  snappy
        savemode:  overwrite
   extra-columns:  transaction_id
                   transaction_date
                   value_column='FIXED_VALUE'
                   auto_column:long auto
partition-columns: create_date:DATE
                   process_flag
 # example of usage with hive input file 

$ xml2er -l<Logical Schema ID>  -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table>
# example of usage with jdbc as input

$ xml2er -l<Logical Schema ID>  -C source_data -I seq_nbr -R 
transaction_id -R transaction_date -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -i "create_date:date" -i process_flag -T <[schema.]table> jdbc:….
 # example with raw files xml/json as input

$ xml2er -l<Logical Schema ID> -R value_column='FIXED_VALUE' -R 
"auto_column:long auto" -T <[schema.]table> jdbc:….

When the column data is mentioned without value, or flag (auto, random, now, id, filename, filepath), the value must be found in the input source columns.

XML/JSON documents in files

For reading Source Data from files you need to specify the path to the Source Connection, e.g. an FTP connection or a path to a file server.

$ xml2er|json2er|xsd2er [OPTIONS] INPUTPATH

Below is a list of some useful parameters
--archive-read           Read archive files. zip, tar, jar, cpio, 7z...
                        default: true

--byte-stream            Enforce reading as a stream of bytes. Useful
                        for very large files > 2.1 GB.

--detect-type            Ignores non XML/JSON/XSD files
                        default: true

--filter-content         Filter content, wrong bytes and charset detection
                        default: true
With the byte stream switch you can read files that are bigger than 2.14 GB. It slows down performance but lets you read very large XML or JSON documents with a lower memory footprint. The default approach is reading files as byte arrays.

Detecting a file type with –detect-type is particularly useful when the dataset is a mix of different types of Source Data, e.g. JSON and XML.

Example disabling all filters:

$ xml2er|json2er|xsd2er --archive-read false --filter-content false --detect-type false [OPTIONS] INPUTPATH

XML/JSON documents in tabular files

For tabular files, one of the columns contains XML or JSON documents, e.g. a column inside a Parquet file.

$ xml2er|json2er -F <File Format> -C <XML Column> [OPTIONS] INPUTPATH

Useful options:

-F, --in-format FORMAT    Input format. parquet, avro, orc, json, csv, tsv

-C, --column <NAME|EXPR> [AS ALIAS] 
                          Column which brings XML/JSON data.

--where WHERE             Where clause for filtering tabular files, jdbc 
                          or hive tables.
 
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
                          Identifier column for tabular files, jdbc or 
                          hive tables. The EXTRA makes this column 
                          available as extra column too.

-<MIN>, --id-min MIN      Identifier column's lower bound                              
+<MAX>, --id
# example reading csv files
$ xml2er -F csv -C xml_col [OPTIONS] INPUTPATH

The parameter -F or –in-format also accepts the full name if the extra jars are included on the spark-submit library path.

# example reading avro files
$ xml2er -F avro -C xml_col [OPTIONS] INPUTPATH

# or if avro package isn't available
$ xml2er|json2er --pkg com.databricks:spark-avro_2.11:4.0.0 -F avro 
INPUTPATH …

It also accepts filtering of the database on the primary key column.

# example reading orc files filtering by sequential id column between 1 and 100
$ xml2er -F orc -C xml_col -I pk_col -1 +100 [OPTIONS] INPUTPATH

XML/JSON inside a database table

Using this option you need to define the table and column in your database, which contains the XML/JSON documents. You have the option to define the database table as a View or even an SQL statement.

$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> [OPTIONS] [INPUTPATH]

# or using a sql query

$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS] [INPUTPATH]

Relevant parameters:

-T, --table TABLE         Table or SQL for Source Data
 
-C, --column <NAME|EXPR> [AS ALIAS] 
                          Column which brings XML/JSON data.

--where WHERE             Where clause for filtering tabular files, jdbc 
                          or hive tables.

-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
                          Identifier column for tabular files, jdbc or
                          hive tables. The EXTRA makes this column
                          available as extra column too.

-<MIN>, --id-min MIN      Identifier column's lower bound                              
+<MAX>, --id-max MAX      dentifier column's upper bound

Two different table based input data sources are supported:

1. JDBC
2. HIVE

JDBC as a Source Connection

The JDBC Input data sources require to include the source path with a jdbc: protocol.

$ xml2er|json2er -T <[[Catalog.]Schema.]Table]> -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database url>

# or using a sql query

$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> -U <user> -P <password> [OPTIONS] jdbc:<database url>

Example:

$ xml2er -T tab -C xml_col -U rob -P rob_pass [OPTIONS] jdbc:postgresql://host/db

# or using a sql query

$ xml2er -T "select xml_col from tab" -C xml_col -U rob -P rob_pass 
[OPTIONS] jdbc:postgresql://host/db

The above command will use a single thread to load the data from your source database. You can parallelise the load by providing a sequential ID and a range in your source table. As a result more than one connection can be used to load the data.

To read in parallel (available on a single machine as well), a sequential ID column name (I switch) and its range must be provided. The degree of parallelism is determined by the number of vCPUs you have allocated to Flexter, cf. section CPU settings in this guide. Assuming you have given Flexter 4 vCPUs it will create 4 connections to the Source Database and extract the data in parallel from there. In the example below the range is 100 records. Flexter would generate 4 queries that select 25 records each.

$ xml2er -T tab -C xml_col -I pk_col -1 +100 …

# or using a sql query

$ xml2er -T "select pk_col,xml_col from tab where pk_col between 1 and 100" -C xml_col -I pk_col -1 +100 …

Providing a unique identifier column is also very useful for logging purposes. If provided, Flexter will write the value for the identifier column to the log in case of failure. This will allow you to easily identify where an issue has occurred.

Hive as a Source Connection

Using Hive as a Source Connection is similar to using JDBC. The only difference is that Hive does not require us to specify a Source Connection in the form of an input path (JDBC). Flexter can access the Hive tables directly through Spark.

$ xml2er|json2er -T <[Database_Schema.]Table]> -C <XML_JSON Column> [OPTIONS]

# or using a sql query only for batch mode

$ xml2er|json2er -T "<SQL QUERY>" -C <XML_JSON Column> [OPTIONS]

Streaming mode only supports the table parameter without an actual query. In streaming mode you need to provide the –input-format or -F.

$ xml2er|json2er -Z <duration> -T <[Database_Schema.]Table]> -C <XML_JSON Column> -F <Format> [OPTIONS]

Examples:

# Reading all table in batch mode

$ xml2er -T tab -C xml_col ...

# Reading partially the table in batch mode

$ xml2er -T tab -C xml_col -I pk_col -1 +100 …

# Reading partially the table with query in batch mode

$ xml2er -T "select xml_col from tab where pk_col between 1 and 100" …

# Reading new data from a table in streaming mode

$ xml2er -Z 10s -T tab -C xml_col -F orc ...