Merging and Converting Files
When Flexter is used periodically, loading new data and incrementing the same output tables, it always creates new files. This is a common behavior in Hadoop and Spark based systems. The purpose of the Merge2er tool is to solve the so called small files problem and merge large numbers of small files into a small number of larger files. Users can also use this tool to convert output data from one format to another.
Support file systems
1. Local file system( file://) FTP file system( ftp://)
2. Distributed file system( hdfs://), webhdfs file system, swebhdfs file system, har file system, hftp file system, hsftp file system...
Supported data formats
1. Parquet, orc, json, csv, tsv, psv, libsvm...
2. Other formats supported by Spark, the associated JAR files need to be included in SPARK_HOME/jars folder
As of Flexter 1.7
1. The Merge2er module now works in skip mode and is able to import SQL query result.
2. Additional support for jdbc input/output targets
Common Parameters
Usage: merge2er [Options] [INPUT]
Options:
--in-schema SCHEMA Input schema related to hive or jdbc source
--schema-pattern PATTERN Input schema pattern related to hive or jdbc
source: Ex "*word*"
--table-pattern PATTERN Input table pattern related to hive or jdbc
source: Ex "*word*"
-T, --table TABLE|QUERY Table or SQL for Source Data
--where WHERE Where clause for filtering tabular files,
jdbc or hive tables.
-U, --in-user USER Output user
-P, --in-password PASSWORD Output password
-F, --in-format FORMAT Input format
--has-header If the input csv/psv/tsv file has a header
# output data
-o, --out OUTPUT Output location path
-u, --out-user USER Output user
-p, --out-password PASSWORD Output password
-f, --out-format FORMAT Output format
-z, --compression COMPRESSION File formats compression mode
-V, --hive-create Enable creating hive tables
-E, --out-schema SCHEMA Creating hive or jdbc tables into schema
-r, --out-prefix PREFIX Prefix for output
-y, --out-suffix SUFFIX Suffix for output
--rename [IDX=|NAME=]NEW_NAME,...
Rename a table or multiple tables by
index
(1,2,3...) or by its original name
--constraints Enables copying the primary and foreign
key
constraints
-R, --extra-column <NAME|EXPR>[:DATATYPE] [SOURCE] [AS ALIAS]
Extra column(s) to be included in all output
tables.
-i, --partition-column <NAME|EXPR>[:DATATYPE] [SOURCE] [AS ALIAS]
SOURCE: AUTO|RANDOM|NOW|FILENAME|FILEPATH|=VALUE
Partition column(s) to be included in all.
output tables.
-S, --savemode SAVEMODE Save Mode when table, directory or file
exists or when getting the output tables's
DDL
[e]rror, [a]ppend, [o]verwrite, [i]gnore, [w]riteschema, [p]rintschema
default: append
-Y, --out-part PARTITIONS Number of partitions for writing data
-B, --batchsize BATCHSIZE Batch size to write into databases
default: 1000
The input data source must conform to the following specifications:
Each table must be included in a folder. Folder name must not contain equals sign or begin with an underscore or dot.
Users can process multiple tables in one go by pointing the input path to the parent folder of those tables. Folders and files (except temporary file which name starts with a dot or underscore) are not allowed to appear at the same level.
Basic command:
# Template
$ merge2er [-s] [-F <input_format>] [--has-header true] [-f <output_format>] [-Y <num of partitions>] [-z <compression>] [-o <output_path>] <input_path>
If the user does not specify the format of the input data, merge2er will try to detect the input data format by inspecting the file’s extension. If the user does not specify the format of the output data, merge2er will use input data format as output data format.
If the user does not specify an output path, merge2er will process data under ‘in-place’ mode. It will generate the new table in a temporary folder then replace the original table.
The user can specify a top level directory containing multiple table folders inside:
$ merge2er /topLevel/databaseFolder
but can also filter tables by path expression:
$ merge2er /topLevel/databaseFolder/tab*
# Testing merging dataset at /tmp/test
$ merge2er /tmp/test -Y 2
INFO Check Input/Output Paths
WARN If you are processing small data set, we recommend that you specify a different output path.
WARN Do not stop the program during the processing!!
INFO Input is using LocalFileSystem
INFO Output is using the same file system as Input
INFO The output directory is the same as the input directory, path: /tmp/test
INFO No data format specified. Trying to detect data format by filename extension
INFO Detect Input Data Format
INFO Found [csv] format file at file:/tmp/test/items/part-00001-5fb9f925-c673-4d3b-a7d1-90ff8de16405-c000.csv, using [csv] as input data format!
INFO Input data format: csv
INFO No output data format specified, use input data format as output data format: csv
INFO Start Processing
INFO Replacing table: items at /tmp/test/items
INFO Replacing table: fillings at /tmp/test/fillings
INFO Replacing table: size at /tmp/test/size
INFO Replacing table: topping at /tmp/test/two/topping
INFO Replacing table: batters at /tmp/test/two/batters
INFO Process finished
INFO Registering success of job 9315
Convert Data Format
Merge2er can also be used to convert the data format simply by specifying a different output data format. In this case, we recommend that the user specify an output path that is different from the input path.
# Convert data format
$ merge2er /tmp/test -o /tmp/output -F csv -f json [-Y 2]
Import tables from one database to another
When targeting a hive database or a jdbc connection, Merge2er can load an entire catalog, multiple schemas, multiple tables or even everything from the connection (whether the user does not specify any table):
$ merge2er --in-schema=<> --schema-pattern=<> --table-pattern=<> [jdbc:...]
In the example below we will try to copy a table from the source Postgres database to the target in Snowflake:
$ merge2er -T items jdbc:postgresql://HOSTNAME:PORT/DB -U inuser -P inpass -o jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -u outuser -p outpass
13:14:12.191 INFO Initialized in 3142 milliseconds
13:14:12.198 INFO Check Input/Output Source
13:14:12.198 INFO Using jdbc input source
13:14:12.198 INFO Using jdbc output source
13:14:12.198 INFO Input source: jdbc:postgresql://HOSTNAME:PORT/DB
13:14:12.198 INFO Output source: jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA
13:14:12.198 INFO Input data format: jdbc
13:14:12.199 INFO Output data format: jdbc
13:14:12.199 INFO Start Processing
13:14:14.257 INFO table items: overwriting
13:14:18.954 INFO Process finished
13:14:18.957 INFO Registering success of job 16270
13:14:19.281 INFO Finished successfully in 10229 milliseconds
# schema
job: 16270
# statistics
startup: 3142 ms
Process finished with exit code 0
OUTUSER#[email protected]> show tables;
+------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------+
| created_on | name | database_name | schema_name | kind | comment | cluster_by | rows | bytes | owner| retention_time |
|------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------|
| 2018-11-07 | CITY | TESTING | PUBLIC | TABLE | | | 7 | 1024 | SYSADMIN | 1 |
| 2018-11-07 | COUNTRY | TESTING | PUBLIC | TABLE | | | 4 | 1024 | SYSADMIN | 1 |
| 2019-07-22 | ITEMS | TESTING | PUBLIC | TABLE | | |1| 1024 | SYSADMIN | 1 |
+------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------+
Please note that the above command works even with SQL queries (including filtering and joining against other tables or views). For example, you might try to run:
$ merge2er -T "(SELECT * FROM items) items_select" jdbc:postgresql://HOSTNAME:PORT/DB -U inuser -P inpass -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass
In section - “JDBC as a Target Connection”, we outlined the possible benefits in making usage of the -B/–batchsize parameter.
In such situations, increasing the batch size (by default set to 1k) could be a valid option to speed up the process. In particular, the batch size represents the maximum amount of writing operations executed in a single batch and, consequently, reduces the number of packets sent across the connection.
In this example we copied a Snowflake table (7.3 M rows, for a total of 130.6MB) to a local postgres database.
$ merge2er -T S_BIKERIDE jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -U inuser -P inpass -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass
The process took 213 seconds to complete:
14:22:35.567 INFO Finished successfully in 213020 milliseconds
We then tried to increase the Batch size:
$ merge2er -T S_BIKERIDE -B 100000 jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -U inuser -P inpass -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass
In this case the run took 179 seconds:
14:22:38.128 INFO Finished successfully in 179078 milliseconds
Load text files to database
Merge2er can also be used to import tsv, csv or psv to a target database. For instance:
$ merge2er --has-header false sample.csv -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass
16:38:45.807 INFO Creating Spark Session
16:38:47.656 INFO Spark job initialized
version: 2.4.3
applicationId: local-1563464327352
16:38:47.687 INFO Registering new job
16:38:48.615 INFO Job configuration update
# input
path: samplecsv.csv
format: csv
has-header: false
# output
path: jdbc:postgresql://HOSTNAME:PORT/DB
user: outuser
password: ***
format: jdbc
savemode: overwrite
# schema
job: 16037
16:38:48.623 INFO Initialized in 3431 milliseconds
16:38:48.629 INFO Check Input/Output Source
16:38:48.934 INFO Input is using LocalFileSystem
16:38:48.934 INFO Using jdbc output source
16:38:48.934 INFO Input source: sample.csv
16:38:48.935 INFO Output source: jdbc:postgresql://HOSTNAME:PORT/DB
16:38:48.935 INFO Input data format: csv
16:38:48.936 INFO Output data format: jdbc
16:38:48.938 INFO Start Processing
16:38:53.190 INFO table sample_csv: overwriting
16:38:53.725 INFO Process finished
16:38:53.728 INFO Registering success of job 16037
16:38:54.055 INFO Finished successfully in 8860 milliseconds
# schema
job: 16037
# statistics
startup: 3431 ms
Process finished with exit code 0
Checking the output:
~$ psql -U outuser -d DB
Password for user outser:
psql (10.9 (Ubuntu 10.9-0ubuntu0.18.04.1))
Type "help" for help.
DB=# \dt
List of relations
Schema | Name | Type | Owner
--------+-----------------+-------+---------
public | sample_csv | table | outser
Merge2er can also process parquet, avro and orc file formats.
13:40:54.635 INFO Registering new job
13:40:55.502 INFO Job configuration update
# input
path: userdata.parquet
format: parquet
# output
path: jdbc:postgresql://HOSTNAME:PORT/DB
user: outuser
password: ***
format: jdbc
savemode: overwrite
# schema
job: 16290
13:40:55.515 INFO Initialized in 2927 milliseconds
13:40:55.524 INFO Check Input/Output Source
13:40:55.700 INFO Input is using LocalFileSystem
13:40:55.700 INFO Using jdbc output source
13:40:55.700 INFO Input source: userdata.parquet
13:40:55.701 INFO Output source: jdbc:postgresql://HOSTNAME:PORT/DB
13:40:55.701 INFO Input data format: parquet
13:40:55.701 INFO Output data format: jdbc
13:40:55.702 INFO Start Processing
13:40:58.664 INFO table userdata_parquet: overwriting
13:40:59.851 INFO Process finished
13:40:59.855 INFO Registering success of job 16290
13:41:00.225 INFO Finished successfully in 7634 milliseconds
# schema
job: 16290
# statistics
startup: 2927 ms
Process finished with exit code 0