Data Conversion

This section demonstrates the Flexter Data Conversion syntax and solutions.

With a Data Flow ID (logical) we can start the Conversion of Source Data to a relational Target Format in a Conversion task.

The Conversion can happen in two modes

Batch mode
Streaming mode (experimental)

Batch Conversion

Batch Conversion is the primary Conversion mode in Flexter.

Pros:

  1. Easier to make tests
  2. More interactive
  3. Accepts more types of Source Connection
  4. Processes all input data at once.
  5. When you generate files as your Target Format fewer output files are generated

Cons:

  1. It doesn’t automatically process new incoming data.
  2. It doesn’t restart after a cluster is restarted.
  3. It caches all data before writing, which requires more memory for big datasets.
  4. You use the xml2er command to kick off a Conversion task and specify a previously generated Data Flow ID to kick off the process.

Basic command:

# Template

$ xml2er|json2er -s -l<Data Flow ID> INPUTPATH

# Test reading a file, converting the data using skip -s parameter

$ xml2er -s -l3 donut.zip

Flexter supports various types of Source Connections as input aths. These are explained in the Source Connections section of this guide.

Commonly used switches

Here is a list of the most commonly used parameters when converting data in batch mode:

-o, --out OUTPUT              Target Path for Output
	                      	
-u, --out-user USER           Target Path User
                          	
-p, --out-password PASSWORD   Target Connection Password

--out-token TOKEN             Session token of output S3 bucket

-B, --batchsize BATCHSIZE     Batch size to write into databases
                            default: 1000

-f, --out-format FORMAT       Target Format. 
                              - jdbc, parquet, orc, json, csv, tsv, psv, avro...
                            default: orc
                      	
-z, --compression COMPRESSION Compression mode for filesystem output 
                              - none, snappy, gzip, bzip2, deflate, lzo, lz4, zlib, xz...
                              default: snappy
                          	
-S, --savemode SAVEMODE       Save Mode when target table, directory or file exists
                              [e]rror (Raise error if the table exists)
                              [a]ppend (appends to data sets in the Target)
                              [o]verwrite (drops and recreates datasets in Target)
                              [i]gnore (Ignore table if exists)
                              [w]riteschema (write the tables’ DDL to Hive or to the 
                                target Jdbc database)
                              [p]rintschema (print out the tables’ DDL to the console)
                              default: append

-r, --out-prefix PREFIX       Append a prefix in each output table name

-y, --out-suffix SUFFIX       Append a suffix in each output table name

-N, --unified-fks             Unified FKs in composite columns (applies to "reuse" 
                              optimization)

--reset-pks                   Sequential reset primary keys starting from 1

--sequence-type               Sequential data type defined in ANSI SQL: decimal(38,0), 
                              bigint, varchar(36),...

--default-varchar-len LENGTH  Length of VARCHAR/CLOB datatype for mapping generation where not explicitly
                              defined by XSD schema

--default-num-type native|max|decimal[(PRECISION,SCALE)]|PRECISION,SCALE
                              Definition of the default numeric type, options:
                                - native: Keep the numeric types optimized to native types like TINYINT, REAL,
                                  DOUBLE PRECISION
                                - max: Enforce the maximum precision length accepted by the spark, DECIMAL(38,18)
                                - decimal: Enforce using decimal type instead native ones, regards their precision
                                  and scale
                                - decimal(PRECISION[,SCALE]): Enforce using a particular decimal type with minimum
                                  precision and scale
                                - PRECISION[,SCALE]: same as above
                                - decimal(+PRECISION[,+SCALE]): Increase the precision and/or scale by margin
                                  precision and scale
                                - +PRECISION[,+SCALE]: same as above

--default-float-type native|max|decimal[(PRECISION,SCALE)]|PRECISION,SCALE
                              The same as --default-num-type, limited only for float numbers
                             
--default-int-type native|max|decimal[(PRECISION,SCALE)]|PRECISION,SCALE
                              The same as --default-num-type, limited only for integer numbers

-R, --extra-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                              Extra column(s) to be included in all output tables

-i, --partition-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                              Partition column(s) to be included in all output tables

--rename [IDX=|NAME=]NEW_NAME,* Rename a table or multiple tables by index (1,2,3...) or 
                              by its original name

--namemode MODE               Change the table/column original names to:
                                - "": Keep original names
                                - [l]ower: to lower case.
                                - [u]pper: to upper case.
                                - [c]camelcase: Remove separators and keep words separated by case.

-e, --parsemode MODE          Mode of parsing.
                                - with doc stats:    [a]ll, [d]ata, [s]tats
                                - without doc stats: [A]ll, [D]ata, [S]tats

-c, --commands                  Show SQL commands
                              
-s, --skip                      Skip writing results

The full list of Target Connections is explained in the Target Connections section of this guide.

At a minimum you should define a Target Connection path (-o switch) and a Data Flow ID (-l logical switch).

# Template

$ xml2er -s -l<Data Flow ID (logical)> -o <Output Path> INPUTPATH

# Testing reading a file, extracting the data and the output definitions

$ xml2er -s -l3 -o output_dir donut.zip
…
16:17:00.571 INFO  Job configuration update
…
   # output
      path:  output_dir
      format:  parquet
      compression:  snappy

Removing –skip or -s, kicks off processing of the data. In this example we use [o]verwrite mode to store the Source Data donut.zip in the output_dir as CSV without compression.

$ xml2er -l3 -o output_dir -f csv -z none -S o donut.zip
…
17:16:24.110 INFO  Finished successfully in 17701 milliseconds
# schema
	origin:  7
	logical:  3
	job:  8

# statistics
	startup:  4107 ms
	load:  9416 ms
	parse:  6855 ms
	write:  2297 ms
	xpath stats:  436 ms
	doc stats:  4744 ms
	
	xpaths:  19              | map:0.0%/0  new:100.0%/19
	documents:  1            | suc:100.0%/1  part:0.0%/0  fail:0.0%/0

Once the job has finished successfully, you can find the converted data in the Target Path.

Memory settings

By default Flexter processes all of your data in memory.

Flexter reads your Source Data in parallel into memory. Once this operation has completed it writes to the target tables or files. This happens in parallel, but one table at a time.

You can allocate memory with the –mem switch. By default memory allocation is 1 GB.

# Increase memory allocated to Flexter to 4 GB

$ xml2er -g1 <input_path> --mem 4G

It is highly recommended to process all of your data in memory. However, Flexter also gives you the option to spill your data to disk. Spilling to disk will significantly slow down processing of Source Data.

You can specify the processing and the caching behaviour with the -k switch.

-k, --cache CACHE          Spark's storage level. Full or shortcut names:
                          - N, NONE
                          - M, MEMORY_ONLY
                          - M2, MEMORY_ONLY_2
                          - MD, MEMORY_AND_DISK
                          - MS, MEMORY_ONLY_SER
                          - MD2, MEMORY_AND_DISK_2
                          - MS2, MEMORY_ONLY_SER_2
                          - MDS, MEMORY_AND_DISK_SER
                          - MDS2, MEMORY_AND_DISK_SER_2
                          - D, DISK_ONLY
                          - D2, DISK_ONLY_2
                          - O, OFF_HEAP
                          - other...
                            default: M
Refer to the Spark user guide for more details on the caching options: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#rdd-persistence

Let’s look at an example where we spill to disk

# In this example we use Memory and disk in serialized form to process the Source Data

$ xml2er -g1 <input_path> -k MDS

CPU settings

You can allocate the number of vCPUs that Flexter should use.

In local mode Flexter uses all the available vCPUs by default. You can override this behaviour by telling Flexter the number of vCPUs to use

# In this example we tell Flexter to use 3 vCPUs

$ xml2er master local[3] -g1 <input_path> -k MDS

When running on Hadoop or a Spark standalone cluster you specify the number of cores and executors to use

# Specifying the number of vCPUs on Hadoop

$ xml2er --master yarn --cores 10 --exec 4 -g1 <input_path> -k MDS

# Specifying the number of vCPUs on Spark

$ xml2er --master spark://hostname-sparkmaster --cores 10 --exec 4 -g1 <input_path> -k MDS

Number of vCPUs for reading and writing data

By default Flexter uses the number of vCPUs allocated for reading and writing data. So if you allocate 4 vCPUs to Flexter it will use 4 vCPUs for both reading and writing data.

In certain situations it makes sense to override this behaviour, e.g. you process data on a Hadoop cluster and store the data in a JDBC RDBMS. In such a scenario the number of vCPUs should probably be much greater than the number of vCPUs to write the data. You might overwhelm the JDBC RDBMS with connection requests and choke it for resources.

When writing to files as a Target Format you may want to limit the number of vCPUs for writing to avoid the small files problem. The more vCPUs you allocate the more small files will be created. Creating many small files may result in poor performance.

-X, --in-part PARTITIONS    	Number of vCPUs for reading data
-Y, --out-part PARTITIONS   	Number of vCPUs for writing data
--tables-at-once <N>		Defines the N number of tables being written

Extra Options Settings

In some cases, due some particular datasoource of spark packages, or jdbc driver, some extra options can be sent directly to hadoop configuration or spark dataframe options

--in-opt PROP=VALUE,...       Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2

--out-opt PROP=VALUE,...      Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2

Streaming Conversion (experimental)

This approach make possible to run the data extraction as long running process (service), the process will be open, waiting for new income.

Pros: 1. Detects and processes new incoming data automatically 2. Auto restarts and resumes reading incoming data from queue 3. Use less memory processing at it only processes newly incoming data.

Cons: 1. It’s not interactive 2.More complex to run 3. It generates more output files, at each discrete process. 4. All the streaming data extraction commands are similar to the batch data extraction commands, except it also has to include the –stream or -Z parameter

-l, --schema-logical ID        	Data Flow ID (logical)
-Z, --stream DURATION          	Stream mode duration. 1h, 1m, 1s, 1ms..

The duration based on spark streaming api is the time to waiting to accumulate the enough data to run a micro-batch.

Other difference between the batch and streaming is in the file based input data sources, the input path must be a directory, and the files would need to be placed there.

Other supported data sources are explained in the Input Data Sources section.

Basic command:

# Template

$ xml2er -s -Z <duration> -l<Logical Schema ID> INPUTPATH

# Testing starting the streaming process

$ xml2er -s -Z 10s -l3 input_dir
…
17:35:24.042 INFO  Initialized in 84 milliseconds, waiting for new data...

In other terminal or folders user interface, a new file could be sent there, It is expected to see it processing the data.

$ cp donut.zip input_dir/

Then we could observe in the xml2er logs the extraction happening.

…
17:42:12.344 INFO  Event finished successfully in 2225 milliseconds, waiting for new data...
# statistics
	startup:  4107 ms
	load:  9416 ms
	parse:  6855 ms
	write:  2297 ms
	xpath stats:  436 ms
	doc stats:  4744 ms
	
	xpaths:  19           | map:0.0%/0  new:100.0%/19
	documents:  1         | suc:100.0%/1  part:0.0%/0  fail:0.0%/0

Now after the test is done, using a CTRL+C the process can be finished. In other cluster setups it might require to use other command or UIs to finish the process.

After finishing the process, the next step is including the Output Data Source. This step is similar as batch data extraction, and the parameters are the same. More details about the it are being explained in the Output Data Sources section.

For more advanced streaming options, you could also use:

-K, --checkpoint-dir DIRECTORY  Spark Streaming checkpoint directory
-n, --new-since DURATION    	Records are considered new since X ago

The checkpoint directory helps to make the process restartable, storing the input queue in a folder. The new since parameter changes the spark default behaviour, which ignores files with update date older than 10s before starting the process.

When the command is ready, removing the –skip or -s, launches processing in the streaming mode. Note the process remains active (doesn’t finish):

$ xml2er -Z 10s -l3 -o output_dir -f tsv -z gzip input_dir
…
18:03:04.927 INFO  Initialized in 78 milliseconds, waiting for new data...
It’s possible now to send a new file to the input directory (ie. from other terminal session):

$ cp donut.zip input_dir/
Then the event logging periodically displays information about processed data
…
18:05:44.432 INFO  Event finished successfully in 4292 milliseconds, waiting for new data...
# statistics
	startup:  4107 ms
	load:  9416 ms
	parse:  6855 ms
	write:  2297 ms
	xpath stats:  436 ms
	doc stats:  4744 ms
	
	xpaths:  19           | map:0.0%/0  new:100.0%/19
	documents:  1         | suc:100.0%/1  part:0.0%/0  fail:0.0%/0
The processed data is being automatically stored in the output path.