Target Connections

This section goes through the Flexter target connection creation procedure.

Target Connections are based on the URI standard. It means Flexter can access different locations and protocols defined by the path.

protocol://user:password@hostname:port/path?param=value#param=value

Supported protocols

Flexter supports the following protocols in batch mode:

  • Local Filesystem
    • file://
      # file example
      file://pathTo/fileOrDirectory
      
      # file format
      file://[host[:port]][/absolute/path|/drive:/path|/share/path|/relative/path][?query][#fragment]
      
    • https://
      # https example
      https://host:port/pathTo/file
      
      # https format
      https://[user[:password]@]host[:port][/path][?query][#fragment]
      
    • ftp://
      # ftp example
      ftp://host:port/pathTo/file
      
      # ftp format
      ftp://[user[:password]@]host[:port][/path/file][;type={A|I|D}][?query][#fragment]
      
    • ftps://
      # ftps example
      ftps://host:port/pathTo/file
      
      # ftps format
      ftps://[user[:password]@]host[:port][/path/file][;type={A|I|D}][?query][#fragment]
      
  • Hadoop Filesystems
    • hdfs://
      # hdfs example
      hdfs://host:port/pathTo/fileOrDirectory
      
      # hdfs format
      hdfs://[user@]host[:port]/path/file
      
    • webhdfs://
      # webhdfs example
      webhdfs://host:port/pathTo/fileOrDirectory
      
      # webhdfs format
      webhdfs://[user@]host[:port]/path/file
      
  • AWS filesystem
    • s3a://
      # s3a example
      s3a://bucket/pathTo/fileOrDirectory
      
      # s3a format
      s3a://[accessKey[:secretKey]@][bucket[/path/object]][?endpoint=host[:port]&region=REGION&otherOptions][#fragment]
      
  • Azure Filesystems
    • abfs://
      # abfs example
      abfs://[email protected]/pathTo/fileOrDirectory
      
      # abfs format
      abfs://[container[@]account[:domain][:port]][/path/blob][?query][#fragment]
      
    • abfss://
      # abfss example
      abfss://[email protected]/pathTo/fileOrDirectory
      
      # abfss format
      abfss://[container[@]account[:domain][:port]][/path/blob][?query][#fragment]
      
    • adl://
      # adl example
      adl://account.azuredatalakestore.net/pathTo/fileOrDirectory
      
      # adl format
      adl://[filesystem.azuredatalakestore.net[:port]][/path/file][?query][#fragment]
      
  • Google Cloud Filesystems
    • gs://
      # gs example
      gs://bucket/pathTo/fileOrDirectory
      
      # gs format
      gs://[bucket]/[path/to/object][?query][#fragment]
      
  • Databricks Filesystem
    • dbfs://
      # dbfs example
      dbfs:/pathTo/fileOrDirectory
      
      # dbfs format
      dbfs://[profile@][/]/path/file[?query][#fragment]
      
  • JDBC Protocol
    • jdbc:<dialect>://

      # jdbc example
      jdbc:dialect://host:port/db
      
      # jdbc format
      jdbc:<dialect>://[user[:password]@][host[:port]][/db][?prop1=value1[&prop2=value2]][#fragment]
      
    • JDBC Dialects

      • jdbc:mysql://
        # jdbc:mysql example
        jdbc:mysql://host:port/db
        
        # jdbc:mysql format
        jdbc:mysql://[user[:password]@]host[:port]/[db][?prop1=value1[&prop2=value2]][#fragment]
        
      • jdbc:mariadb://
        # jdbc:mariadb example
        jdbc:mariadb://host:port/db
        
        # jdbc:mariadb format
        jdbc:mariadb://[user[:password]@]host[:port][,host2[:port2],...][/db][?prop1=value1[&prop2=value2]][#fragment]
        
      • jdbc:postgresql://
        # jdbc:postgresql example
        jdbc:postgresql://host:port/db
        
        # jdbc:postgresql format
        jdbc:postgresql://[user[:password]@]host[:port][,host2[:port2],...][/db][?prop1=value1[&prop2=value2]][#fragment]
        
      • jdbc:oracle:thin://
        # jdbc:oracle:thin example
        jdbc:oracle:thin://host:port/db
        
        # jdbc:oracle:thin format
        jdbc:oracle:thin://[user[:password]@]host[:port][/service][?prop1=value1[&prop2=value2]][#fragment]
        
      • jdbc:sqlserver://
        # jdbc:sqlserver example
        jdbc:sqlserver://host:port/db
        
        # jdbc:sqlserver format
        jdbc:sqlserver://[host[\instance][:port]][;prop1=value1[;prop2=value2]][#fragment]
        
      • jdbc:db2://
        # jdbc:db2 example
        jdbc:db2://host:port/db
        
        # jdbc:db2 format
        jdbc:db2://[host[:port]/][db][;prop1=value1[;prop2=value2]][#fragment]
        
      • jdbc:teradata://
        # jdbc:teradata example
        jdbc:teradata://host:port/db
        
        # jdbc:teradata format
        jdbc:teradata://[host[:port]][/db][?prop1=value1[&prop2=value2]][#fragment]
        
      • jdbc:h2://
        # jdbc:h2 example
        jdbc:h2://host:port/db
        
        # jdbc:h2 format
        jdbc:h2://[host[:port]/][path/db][;prop1=value1[;prop2=value2]][#fragment]
        
      • jdbc:hsqldb://
        # jdbc:hsqldb example
        jdbc:hsqldb://host:port/db
        
        # jdbc:hsqldb format
        jdbc:hsqldb://[host[:port]/][db][;prop1=value1[;prop2=value2]][#fragment]
        
      • jdbc:snowflake://
        # jdbc:snowflake example
        jdbc:snowflake://host:port/db
        
        # jdbc:snowflake format
        jdbc:snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?user=&password=&db=&schema=&warehouse=&role=&prop1=val1
        
      • jdbc:bigquery://
        # jdbc:bigquery example
        jdbc:bigquery://host:port/db
        
        # jdbc:bigquery format
        jdbc:bigquery://[project[:location]@][host[:port]][/dataset][?prop1=value1[&prop2=value2]][#fragment]
        
      • jdbc:mongodb://
        # jdbc:mongodb example
        jdbc:mongodb://host:port/db
        
        # jdbc:mongodb format
        jdbc:mongodb://[user[:password]@]host1[:port1][,host2[:port2],...][/db][?prop1=value1[&prop2=value2]][#fragment]
        
      • jdbc:redshift://
        # jdbc:redshift example
        jdbc:redshift://host:port/db
        
        # jdbc:redshift format
        jdbc:redshift://[user[:password]@]host[:port][/db][?prop1=value1[&prop2=value2]][#fragment]
        

    • JDBC Hive Related Dialects

      • jdbc:hive://
      • jdbc:hive2://
      • jdbc:impala://
      • jdbc:spark://
  • Spark Connectors
    • snowflake://
      # snowflake example
      snowflake://host:port?db=db&warehouse=warehouse
      
      # snowflake format
      [user[:password]@]snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?user=&password=&db=&schema=&warehouse=&role=&prop1=val1
      
    • bigquery://
      # bigquery example
      bigquery://project:dataset
      
      # bigquery format
      bigquery://[project:]dataset[.table][?prop1=value1[&prop2=value2]][#fragment]
      
    • mongodb://
      # mongodb example
      mongodb://host:port/db.collection
      
      # mongodb format
      mongodb://[user[:password]@]host[:port][/database[.collection]][?prop1=value1[&prop2=value2]][#fragment]
      
    • redshift://
      # redshift example
      redshift://host:port/db
      
      # redshift format
      redshift://[user[:password]@]host[:port][/database][?prop1=value1[&prop2=value2]][#fragment]
      
    • others

      And other Spark Connectors defined mainly by parameters instead of protocol://


Target Connectors parameters

Some of these protocols require you to provide a username and password. In all cases it’s possible to use the parameters or configuration files to define them. For some, you can specify the username and password directly in the URI path, e.g. for FTP.

-u, --out-user USER            Output user
                          	
-p, --out-password PASSWORD    Output password

The Target Formats can be split into three types:

1. File based
2. Hive
3. JDBC
4. Spark Connector

All four types have the following parameters in common:

-S, --savemode SAVEMODE     Save Mode when table, directory or file
                            exists or when getting the output tables's DDL
                            [e]rror, [a]ppend, [o]verwrite, [i]gnore, [w]riteschema, [p]rintschema
                            default: append

-Y, --out-part PARTITIONS   Number of partitions for writing data

-N, --unified-fks           Unified FKs in composite columns (applies to "reuse" optimization)

--reset-pks                 Sequential reset primary keys starting from 1

--out-opt PROP=VALUE,...    Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2

-r, --out-prefix PREFIX     Append a prefix in each output table name
                              
-y, --out-suffix SUFFIX     Append a suffix in each output table name

--rename [IDX=|NAME=]NEW_NAME,* 
                            Rename a table or multiple tables by index (1,2,3...) or by its original name

--namemode MODE             Change the table/column original names to:
                            - "": Keep original names
                            - [l]ower: to lower case.
                            - [u]pper: to upper case.
                            - [c]camelcase: Remove separators and keep words separated by case.

Unified foreign keys are an alternative way for the ER normalization. It merges all foreign keys into two other columns: one to store the table name and another table to store the foreign key value.

Reset the primary keys is an option for generating primary keys restarting from 1. It can conflict with other generated batches, but it could be useful for the first run or overwrite save mode.

Flexter can generate a partitioned table as an output. The –partition-by parameter generates partitioned output for each table. It’s capable of using static, random (date, time, timestamp, uuid) and input column value for partitioning.

The partition column name can be auto-generated considering the –partition-by parameter or can be set by –partition-by-name parameter.

Target Command Examples

File Directory Targets

For file-based Target Connections we need to define the Target Format and the location for the output directory.

$ xml2er|json2er|merge2er -f <File Format> -o <Output Directory> …

Useful parameters

-f, --out-format FORMAT         Output format:
                                - jdbc, parquet, orc, json, csv, tsv, psv, avro...
                                default: orc
                          	
-z, --compression COMPRESSION   Compression mode for filesystem output
                                - none, snappy, gzip, bzip2, deflate, lzo, lz4, zlib, xz...
                                default: snappy

Similar to a Source Connection, the format parameter also accepts the full class name if the extra jars are included in the spark-submit library path. Here is an example of working with Avro.

# example writing to avro files
$ xml2er|json2er|merge2er -f avro -o <Output Directory> …

HIVE Schema Targets

Hive as a Target Connection behaves similar to Target Connections that output files. This mode can be activated by parameter -V or –hive-create and the output location is optional.

$ xml2er|json2er|merge2er -V …
When the output location is not defined, the tables will be created as managed tables, following the database/schema default location.

With a defined output location, the tables will be created as external tables.

$ xml2er|json2er|merge2er -V -o <Output Directory> …
The target schema might be provided, otherwise the Hive “default” schema will be used implicitly.

Below are the useful options:

-V, --hive-create              Enable creating hive tables
                          	
-E, --out-schema SCHEMA        Creating hive or jdbc tables into schema

JDBC Schema Targets

When specifying JDBC as a Target Connection, you need to include an output path with the jdbc:// protocol.

$ xml2er|json2er|merge2er -u USER -p PASS -o jdbc:<database url> …

An important parameter:

-B, --batchsize BATCHSIZE   	Batch size to write into databases
                                default: 1000

The batch size can be changed to make the inserts stored in bigger or smaller packages before being sent to the database.

Because of the characteristics of a parallel Apache Spark architecture, it can’t guarantee that all inserts will be made in the same transaction.

Each table will require a new transaction, and it could be performed with 1 or more executors. Each executor will use its own transaction across the cluster.

Spark Connector Targets

The Apache Spark offers the option to create custom spark connectors, which the vendors usually create by their own.

Flexter has been supporting fewer options, and others can be tried out if they can operate by setting the correct parameters.

The supported ones are:

  • Spark Snowflake Connector: snowflake://
  • Spark BigQuery Connector: bigquery://
  • Spark MongoDB Connector: mongodb://
  • Spark Redshift Connector: redshift://

These connectors aren’t shipped with Flexter, but the Flexter Modules / download.sh tool can help to install them into Spark, or they can be downloaded using the xml2er|json2er|merge2er --pkg PKG parameter, analog to spark-submit --package parameter.

Snowflake Spark Connector

Example using Spark Snowflake connector. It follows the same standard is JDBC jdbc:snowflake://<snowflake_url>

# user/password based
$ xml2er|json2er|merge2er -u USER -p PASS -o 'snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?db=&schema=&warehouse=&role=&prop1=val1' …

# password-less private key
$ xml2er|json2er|merge2er -u USER --out-opt private_key_file=pathTo/file.key -o 'snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?db=&schema=&warehouse=&role=&prop1=val1' …

# password based private key
$ xml2er|json2er|merge2er -u USER --out-opt private_key_file=pathTo/file.key --out-opt private_key_file_pwd=PASS -o 'snowflake://<account_locator>[.region][.cloud_provider].snowflakecomputing.com[:port]/?db=&schema=&warehouse=&role=&prop1=val1' …

Important parameters:

-E, --out-schema SCHEMA     Creating hive or jdbc tables into schema

--out-opt PROP=VALUE,...    Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2

BigQuery Spark Connector

Example using Spark BigQuery connector.

# direct writing method
$ xml2er|json2er|merge2er -u USER -p PASS -o 'bigquery://[project:]dataset[?prop1=value1[&prop2=value2]][#fragment]' …

# indirect writing method
$ xml2er|json2er|merge2er -u USER -p PASS -o 'bigquery://project[:dataset]?temp=gcs_bucket[&prop1=value1[&prop2=value2]][#fragment]' …

Important parameters:

-E, --out-schema DATASET    Creating BigQuery tables into dataset

--out-opt PROP=VALUE,...    Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2

MongoDB Spark Connector

Example using Spark MongoDB connector.

$ xml2er|json2er|merge2er -u USER -p PASS -o 'mongodb://host[:port][/database[.collection]][?prop1=value1[&prop2=value2]][#fragment]' …

Important parameters:

-E, --out-schema COLLECTION Creating MongoDB tables into a collection

--out-opt PROP=VALUE,...    Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2

Custom Spark Connector

Other unsupported connectors can be tried, if they don’t demand code changes, operating entirely by parameters, it may be worth trying it out.

$ xml2er|json2er|merge2er -f FORMAT_NAME_OR_CLASS --out-opt PROP=VALUE --conf PROP=VALUE

Important parameters:

-f FORMAT_NAME_OR_CLASS     The custom format name or class.   

-o OUTPUT_PATH              It sets the dataFrame writer option path=value

-E, --out-schema SCHEMA     It appends the schema_name. in each written table.

--conf PROP=VALUE           It sets Spark Configuration settings, some connectors require it. 
                            ex: prop1=value1,prop2=value2

--out-opt PROP=VALUE,...    Extra options for Spark dataFrame writer.
                            ex: prop1=value1,prop2=value2

Printing or Writing DDLs

Whether the target is a Jdbc Connection or a Hive schema, you can print out the output tables’ DDL to the console using the [p]rintschema parsing mode.

...
17:38:30.827 INFO Initialized in 2889 milliseconds
17:38:30.829 INFO getting schema definition
17:38:30.831 INFO Loading metadata
17:38:31.407 INFO printing schema definition

CREATE TABLE items ("PK_items" NUMERIC(38) NOT NULL, "FILENAME" VARCHAR(4000) NOT NULL, "FILEPATH" TEXT NOT NULL, PRIMARY KEY("PK_items"))
CREATE TABLE item ("PK_item" NUMERIC(38) NOT NULL, "FK_items" NUMERIC(38) NOT NULL, "id" SMALLINT , "name" VARCHAR(10) , "ppu" FLOAT , "type_" VARCHAR(6) , PRIMARY KEY("PK_item"))
CREATE TABLE topping ("FK_item" NUMERIC(38) NOT NULL, "id" SMALLINT , "topping" VARCHAR(24) )
CREATE TABLE batter ("PK_batter" NUMERIC(38) NOT NULL, "FK_item" NUMERIC(38) NOT NULL, "id" SMALLINT , "name" VARCHAR(12) , PRIMARY KEY("PK_batter"))
CREATE TABLE size_ ("FK_batter" NUMERIC(38) NOT NULL, "size_" VARCHAR(6) NOT NULL)
CREATE TABLE filling ("FK_item" NUMERIC(38) NOT NULL, "addcost" FLOAT , "id" SMALLINT , "name" VARCHAR(16) )

17:38:32.966 INFO checking foreign keys

ALTER TABLE "item" ADD FOREIGN KEY("FK_items") REFERENCES "items"("PK_items");
ALTER TABLE "topping" ADD FOREIGN KEY("FK_item") REFERENCES "item"("PK_item");
ALTER TABLE "batter" ADD FOREIGN KEY("FK_item") REFERENCES "item"("PK_item");
ALTER TABLE "size_" ADD FOREIGN KEY("FK_batter") REFERENCES "batter"("PK_batter");
ALTER TABLE "filling" ADD FOREIGN KEY("FK_item") REFERENCES "item"("PK_item");

Alternatively, you can create the empty target tables using the [w]riteschema mode.

These options could be extremely for set up and preparing the database before writing the data.

$ psql -U flex2er -d x2er
Password for user flex2er:
psql 
Type "help" for help.

x2er=# \dt
        List of relations
 Schema |   	Name   	| Type  |  Owner  
--------+------------------+-------+---------
 public | batter       	| table | flex2er
 public | filling      	| table | flex2er
 public | item         	| table | flex2er
 public | items        	| table | flex2er
 public | size_        	| table | flex2er
 public | topping      	| table | flex2er