Data Conversion
With a Data Flow ID (logical) we can start the Conversion of Source Data to a relational Target Format in a Conversion task.
The Conversion can happen in two modes
Batch mode
Streaming mode (experimental)
Batch Conversion
Batch Conversion is the primary Conversion mode in Flexter.
Pros:
- Easier to make tests
- More interactive
- Accepts more types of Source Connection
- Processes all input data at once.
- When you generate files as your Target Format fewer output files are generated
Cons:
- It doesn’t automatically process new incoming data.
- It doesn’t restart after a cluster is restarted.
- It caches all data before writing, which requires more memory for big datasets.
- You use the xml2er command to kick off a Conversion task and specify a previously generated Data Flow ID to kick off the process.
Basic command:
# Template
$ xml2er|json2er -s -l<Data Flow ID> INPUTPATH
# Test reading a file, converting the data using skip -s parameter
$ xml2er -s -l3 donut.zip
Flexter supports various types of Source Connections as input aths. These are explained in the Source Connections section of this guide.
Commonly used switches
Here is a list of the most commonly used parameters when converting data in batch mode:
-o, --out OUTPUT Target Path for Output
-u, --out-user USER Target Path User
-p, --out-password PASSWORD Target Connection Password
--out-token TOKEN Session token of output S3 bucket
-B, --batchsize BATCHSIZE Batch size to write into databases
default: 1000
-f, --out-format FORMAT Target Format.
- jdbc, parquet, orc, json, csv, tsv, psv, avro...
default: orc
-z, --compression COMPRESSION Compression mode for filesystem output
- none, snappy, gzip, bzip2, deflate, lzo, lz4, zlib, xz...
default: snappy
-S, --savemode SAVEMODE Save Mode when target table, directory or file exists
[e]rror (Raise error if the table exists)
[a]ppend (appends to data sets in the Target)
[o]verwrite (drops and recreates datasets in Target)
[i]gnore (Ignore table if exists)
[w]riteschema (write the tables’ DDL to Hive or to the
target Jdbc database)
[p]rintschema (print out the tables’ DDL to the console)
default: append
-r, --out-prefix PREFIX Append a prefix in each output table name
-y, --out-suffix SUFFIX Append a suffix in each output table name
-N, --unified-fks Unified FKs in composite columns (applies to "reuse"
optimization)
--reset-pks Sequential reset primary keys starting from 1
--sequence-type Sequential data type defined in ANSI SQL: decimal(38,0),
bigint, varchar(36),...
-R, --extra-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
Extra column(s) to be included in all output tables
-i, --partition-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
Partition column(s) to be included in all output tables
--rename [IDX=|NAME=]NEW_NAME,* Rename a table or multiple tables by index (1,2,3...) or
by its original name
--namemode MODE Change the table/column original names to:
- "": Keep original names
- [l]ower: to lower case.
- [u]pper: to upper case.
- [c]camelcase: Remove separators and keep words separated by case.
-e, --parsemode MODE Mode of parsing.
- with doc stats: [a]ll, [d]ata, [s]tats
- without doc stats: [A]ll, [D]ata, [S]tats
-c, --commands Show SQL commands
-s, --skip Skip writing results
The full list of Target Connections is explained in the Target Connections section of this guide.
At a minimum you should define a Target Connection path (-o switch) and a Data Flow ID (-l logical switch).
# Template
$ xml2er -s -l<Data Flow ID (logical)> -o <Output Path> INPUTPATH
# Testing reading a file, extracting the data and the output definitions
$ xml2er -s -l3 -o output_dir donut.zip
…
16:17:00.571 INFO Job configuration update
…
# output
path: output_dir
format: parquet
compression: snappy
Removing –skip or -s, kicks off processing of the data. In this example we use [o]verwrite mode to store the Source Data donut.zip in the output_dir as CSV without compression.
$ xml2er -l3 -o output_dir -f csv -z none -S o donut.zip
…
17:16:24.110 INFO Finished successfully in 17701 milliseconds
# schema
origin: 7
logical: 3
job: 8
# statistics
startup: 4107 ms
load: 9416 ms
parse: 6855 ms
write: 2297 ms
xpath stats: 436 ms
doc stats: 4744 ms
xpaths: 19 | map:0.0%/0 new:100.0%/19
documents: 1 | suc:100.0%/1 part:0.0%/0 fail:0.0%/0
Once the job has finished successfully, you can find the converted data in the Target Path.
Memory settings
By default Flexter processes all of your data in memory.
Flexter reads your Source Data in parallel into memory. Once this operation has completed it writes to the target tables or files. This happens in parallel, but one table at a time.
You can allocate memory with the –mem switch. By default memory allocation is 1 GB.
# Increase memory allocated to Flexter to 4 GB
$ xml2er -g1 <input_path> --mem 4G
It is highly recommended to process all of your data in memory. However, Flexter also gives you the option to spill your data to disk. Spilling to disk will significantly slow down processing of Source Data.
You can specify the processing and the caching behaviour with the -k switch.
-k, --cache CACHE Spark's storage level. Full or shortcut names:
- N, NONE
- M, MEMORY_ONLY
- M2, MEMORY_ONLY_2
- MD, MEMORY_AND_DISK
- MS, MEMORY_ONLY_SER
- MD2, MEMORY_AND_DISK_2
- MS2, MEMORY_ONLY_SER_2
- MDS, MEMORY_AND_DISK_SER
- MDS2, MEMORY_AND_DISK_SER_2
- D, DISK_ONLY
- D2, DISK_ONLY_2
- O, OFF_HEAP
- other...
default: M
Let’s look at an example where we spill to disk
# In this example we use Memory and disk in serialized form to process the Source Data
$ xml2er -g1 <input_path> -k MDS
CPU settings
You can allocate the number of vCPUs that Flexter should use.
In local mode Flexter uses all the available vCPUs by default. You can override this behaviour by telling Flexter the number of vCPUs to use
# In this example we tell Flexter to use 3 vCPUs
$ xml2er master local[3] -g1 <input_path> -k MDS
When running on Hadoop or a Spark standalone cluster you specify the number of cores and executors to use
# Specifying the number of vCPUs on Hadoop
$ xml2er --master yarn --cores 10 --exec 4 -g1 <input_path> -k MDS
# Specifying the number of vCPUs on Spark
$ xml2er --master spark://hostname-sparkmaster --cores 10 --exec 4 -g1 <input_path> -k MDS
Number of vCPUs for reading and writing data
By default Flexter uses the number of vCPUs allocated for reading and writing data. So if you allocate 4 vCPUs to Flexter it will use 4 vCPUs for both reading and writing data.
In certain situations it makes sense to override this behaviour, e.g. you process data on a Hadoop cluster and store the data in a JDBC RDBMS. In such a scenario the number of vCPUs should probably be much greater than the number of vCPUs to write the data. You might overwhelm the JDBC RDBMS with connection requests and choke it for resources.
When writing to files as a Target Format you may want to limit the number of vCPUs for writing to avoid the small files problem. The more vCPUs you allocate the more small files will be created. Creating many small files may result in poor performance.
-X, --in-part PARTITIONS Number of vCPUs for reading data
-Y, --out-part PARTITIONS Number of vCPUs for writing data
--tables-at-once <N> Defines the N number of tables being written
Extra Options Settings
In some cases, due some particular datasoource of spark packages, or jdbc driver, some extra options can be sent directly to hadoop configuration or spark dataframe options
--in-opt PROP=VALUE,... Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
--out-opt PROP=VALUE,... Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2
Streaming Conversion (experimental)
This approach make possible to run the data extraction as long running process (service), the process will be open, waiting for new income.
Pros: 1. Detects and processes new incoming data automatically 2. Auto restarts and resumes reading incoming data from queue 3. Use less memory processing at it only processes newly incoming data.
Cons: 1. It’s not interactive 2.More complex to run 3. It generates more output files, at each discrete process. 4. All the streaming data extraction commands are similar to the batch data extraction commands, except it also has to include the –stream or -Z parameter
-l, --schema-logical ID Data Flow ID (logical)
-Z, --stream DURATION Stream mode duration. 1h, 1m, 1s, 1ms..
The duration based on spark streaming api is the time to waiting to accumulate the enough data to run a micro-batch.
Other difference between the batch and streaming is in the file based input data sources, the input path must be a directory, and the files would need to be placed there.
Other supported data sources are explained in the Input Data Sources section.
Basic command:
# Template
$ xml2er -s -Z <duration> -l<Logical Schema ID> INPUTPATH
# Testing starting the streaming process
$ xml2er -s -Z 10s -l3 input_dir
…
17:35:24.042 INFO Initialized in 84 milliseconds, waiting for new data...
In other terminal or folders user interface, a new file could be sent there, It is expected to see it processing the data.
$ cp donut.zip input_dir/
Then we could observe in the xml2er logs the extraction happening.
…
17:42:12.344 INFO Event finished successfully in 2225 milliseconds, waiting for new data...
# statistics
startup: 4107 ms
load: 9416 ms
parse: 6855 ms
write: 2297 ms
xpath stats: 436 ms
doc stats: 4744 ms
xpaths: 19 | map:0.0%/0 new:100.0%/19
documents: 1 | suc:100.0%/1 part:0.0%/0 fail:0.0%/0
Now after the test is done, using a CTRL+C the process can be finished. In other cluster setups it might require to use other command or UIs to finish the process.
After finishing the process, the next step is including the Output Data Source. This step is similar as batch data extraction, and the parameters are the same. More details about the it are being explained in the Output Data Sources section.
For more advanced streaming options, you could also use:
-K, --checkpoint-dir DIRECTORY Spark Streaming checkpoint directory
-n, --new-since DURATION Records are considered new since X ago
The checkpoint directory helps to make the process restartable, storing the input queue in a folder. The new since parameter changes the spark default behaviour, which ignores files with update date older than 10s before starting the process.
When the command is ready, removing the –skip or -s, launches processing in the streaming mode. Note the process remains active (doesn’t finish):
$ xml2er -Z 10s -l3 -o output_dir -f tsv -z gzip input_dir
…
18:03:04.927 INFO Initialized in 78 milliseconds, waiting for new data...
$ cp donut.zip input_dir/
…
18:05:44.432 INFO Event finished successfully in 4292 milliseconds, waiting for new data...
# statistics
startup: 4107 ms
load: 9416 ms
parse: 6855 ms
write: 2297 ms
xpath stats: 436 ms
doc stats: 4744 ms
xpaths: 19 | map:0.0%/0 new:100.0%/19
documents: 1 | suc:100.0%/1 part:0.0%/0 fail:0.0%/0