Merging and Converting Files

This section describes how Flexter has a way of solving small files problem by Hadoop by offering Merging features using merge2er.

When Flexter is used periodically, loading new data and incrementing the same output tables, it always creates new files. This is a common behavior in Hadoop and Spark based systems. The purpose of the Merge2er tool is to solve the so called small files problem and merge large numbers of small files into a small number of larger files. Users can also use this tool to convert output data from one format to another.

Support file systems

1. Local file system( file://)
2. Remote Web Filesystems as input ex: http:// and ftp://
3. Hadoop HDFS Supported File Systems ex: hdfs://
4. Hadoop Supported Cloud Storages ex: AWS s3, Azure Blob Storage aanda Data Lake, Google CLoud Storage and others. 

Supported data formats

1. Parquet, orc, json, csv, tsv, psv, avro...
2. Other formats supported by Spark, the associated JAR files need to be included in SPARK_HOME/jars folder

Common Parameters

Usage: merge2er [Options] [INPUT]

INPUT                           Files or directories input location path
 
Options:

-h, --help

--version


# schema
                                
-j, --job ID                    Job ID
                                

# spark-submit

--master MASTER_URL             spark://host:port, mesos://host:port, yarn, or local.
                                
--name NAME                     The name of your application.
                                
--jars JARS                     Comma-separated list of local jars to include on the driver

--files FILES                   Comma-separated list of local files to include on the driver

--conf PROP=VALUE               Arbitrary Spark configuration property.

-b, --blocksize BLOCKSIZE       Block Size, eg. 1024kb, 64mb, 128mb...
                                
-D, --warehouse-dir DIRECTORY   Spark SQL warehouse directory
                                
--metrics DURATION              Enabling metrics printing in defined interval: 1h, 1m, 1s...


# input data

--manifest PATH                 Path to manifest file which contains 3 possible CSV formats:
                                * 1 column => <FilePath>
                                * 2 columns => <Bucket>,<FilePath>
                                * 3 columns => <Bucket>,<FilePath>,<Version>
                                The <FilePath> and <Bucket> are related to the manifest path filesystem and location

-U, --in-user USER          	Input user
                                  
-P, --in-password PASSWORD  	Input password

--in-token TOKEN            	Session token of input S3 bucket

-F, --in-format FORMAT      	Input format.
                                - jdbc, parquet, orc, json, csv, tsv, psv, avro...

--in-schema SCHEMA         		Input schema related to hive, jdbc or spark connector sources
  
--schema-pattern PATTERN    	Input schema pattern related hive, jdbc or spark connector sources: Ex "*word*"

--table-pattern PATTERN     	Input table pattern related to hive, jdbc or spark connector sources: Ex "*word*"

--file-table, -q            	Enables loading files as tables, instead of their directories as tables
-Q                          	Disables loading file as tables.
  
-T, --table TABLE|QUERY     	Name of the table (or specified SQL)
  
-I, --id-column <NAME|EXPR>[:DATATYPE] [EXTRA] [AS ALIAS]
                                Name of identifier column to contain document's ID values for tabular
                                files, jdbc or hive tables

-<MIN>, --id-min MIN        	ID value specifying lower bound data range to be processed
                              
+<MAX>, --id-max MAX        	ID value specifying upper bound data range to be processed	 

--where WHERE               	Where clause for filtering tabular files, jdbc or hive tables

--limit LIMIT               	Limit clause for filtering the number of records/documents
	
--has-header                	If the input csv/psv/tsv file has a header
                                default: true
  
--in-opt PROP=VALUE,...     	Extra options for Spark dataFrame reader, ex: prop1=value1,prop2=value2
	

# metadata

-m, --meta URL                  JDBC location of metadata store
                                default: jdbc:postgresql:x2er

-M, --meta-user USER            Metadata user
                                default: flex2er

-w, --meta-password PASSWORD    Metadata password

--store-credentials             Enable storing credentials at metadata store


# output data
 
-o, --out OUTPUT             	Output location path

-u, --out-user USER          	Output user
                                  
-p, --out-password PASSWORD  	Output password

--out-token TOKEN               Session token of output S3 bucket

-B, --batchsize BATCHSIZE   	Batch size to write into databases
                                default: 1000
                              	
-f, --out-format FORMAT      	Output format:
                                - jdbc, parquet, orc, json, csv, tsv, psv, avro...

-z, --compression COMPRESSION  	Compression mode for filesystem output
                                - none, snappy, gzip, bzip2, deflate, lzo, lz4, zlib, xz...
                                default: snappy

-S, --savemode SAVEMODE      	Save Mode when target table, directory or file exists
                                ex: [e]rror, [a]ppend, [o]verwrite, [i]gnore, [p]rintschema, [w]riteschema
                                default: append

-Y, --out-part PARTITIONS   	Number of partitions for writing data	

-V, --hive-create            	Enable creating hive tables
                                  
-E, --out-schema SCHEMA      	Creating hive or jdbc tables into schema

-r, --out-prefix PREFIX      	Append a prefix in each output table name 

-y, --out-suffix SUFFIX      	Append a suffix in each output table name 

--constraints                	Enables copying the primary and foreign key constraints

-R, --extra-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                                Extra column(s) to be included in all output tables

-i, --partition-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                                Partition column(s) to be included in all output tables

--rename [IDX=|NAME=]NEW_NAME,* Rename a table or multiple tables by index (1,2,3...) or by its original name

--legacy-dialect-version VER    Changing the version of Flexter output dialect compatibility to keep older tables
                                compatible with newer Flexter version
                                default: 2

--namemode MODE                 Change the table/column original names to:
                                - "": Keep original names
                                - [l]ower: to lower case.
                                - [u]pper: to upper case.
                                - [c]camelcase: Remove separators and keep words separated by case.                	

--name-max-len SIZE             Maximum column name length
                                default: 30

--out-opt PROP=VALUE,...        Extra options for Spark dataFrame writer, ex: prop1=value1,prop2=value2


# actions

-c, --commands                  Show SQL commands

-s, --skip                      Skip writing results

The input data source must conform to the following specifications:

  1. Each table must be included in a folder. Folder name must not contain equals sign or begin with an underscore or dot.

  2. Users can process multiple tables in one go by pointing the input path to the parent folder of those tables. Folders and files (except temporary file which name starts with a dot or underscore) are not allowed to appear at the same level.

Basic command:

# Template
$ merge2er [-s] [-F <input_format>] [--has-header true] [-f <output_format>] [-Y <num of partitions>] [-z <compression>] [-o <output_path>] <input_path>

If the user does not specify the format of the input data, merge2er will try to detect the input data format by inspecting the file’s extension. If the user does not specify the format of the output data, merge2er will use input data format as output data format.

If the user does not specify an output path, merge2er will process data under ‘in-place’ mode. It will generate the new table in a temporary folder then replace the original table.

The user can specify a top level directory containing multiple table folders inside:

$ merge2er /topLevel/databaseFolder

but can also filter tables by path expression:

$ merge2er  /topLevel/databaseFolder/tab*

# Testing merging dataset at /tmp/test

$ merge2er /tmp/test -Y 2
 
INFO Check Input/Output Paths
INFO Input is using LocalFileSystem
INFO Output is using the same file system as Input
INFO The output directory is the same as the input directory, path: /tmp/test
INFO No data format specified. Trying to detect data format by filename extension
INFO Detect Input Data Format
INFO Found [csv] format file at file:/tmp/test/items/part-00001-5fb9f925-c673-4d3b-a7d1-90ff8de16405-c000.csv, using [csv] as input data format!
INFO Input data format: csv
INFO No output data format specified, use input data format as output data format: csv
INFO Start Processing
INFO Replacing table: items at /tmp/test/items
INFO Replacing table: fillings at /tmp/test/fillings
INFO Replacing table: size at /tmp/test/size
INFO Replacing table: topping at /tmp/test/two/topping
INFO Replacing table: batters at /tmp/test/two/batters
INFO Process finished
INFO Registering success of job 9315

Convert Data Format

Merge2er can also be used to convert the data format simply by specifying a different output data format. In this case, we recommend that the user specify an output path that is different from the input path.

# Convert data format
$ merge2er /tmp/test -o /tmp/output -F csv -f json [-Y 2]

Import tables from one database to another

When targeting a hive database or a jdbc connection, Merge2er can load an entire catalog, multiple schemas, multiple tables or even everything from the connection (whether the user does not specify any table):

$ merge2er --in-schema=<> --schema-pattern=<> --table-pattern=<>  [jdbc:...]
For instance, you can export a table from a source database directly to another one, simply specifying both the connection details (url, user and password) and listing the tables to be copied under the -T/–tables parameter.

In the example below we will try to copy a table from the source Postgres database to the target in Snowflake:

$ merge2er -T items  jdbc:postgresql://HOSTNAME:PORT/DB  -U inuser  -P inpass   -o jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -u outuser -p outpass

13:14:12.191 INFO  Initialized in 3142 milliseconds
13:14:12.198 INFO  Check Input/Output Source
13:14:12.198 INFO  Using jdbc input source
13:14:12.198 INFO  Using jdbc output source
13:14:12.198 INFO  Input source: jdbc:postgresql://HOSTNAME:PORT/DB
13:14:12.198 INFO  Output source: jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA
13:14:12.198 INFO  Input data format: jdbc
13:14:12.199 INFO  Output data format: jdbc
13:14:12.199 INFO  Start Processing
13:14:14.257 INFO  table items: overwriting
13:14:18.954 INFO  Process finished
13:14:18.957 INFO  Registering success of job 16270
13:14:19.281 INFO  Finished successfully in 10229 milliseconds

# schema
      job:  16270

# statistics
        startup:  3142 ms

Process finished with exit code 0
Checking the result:

OUTUSER#[email protected]> show tables;
+------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------+
| created_on | name | database_name | schema_name | kind  | comment | cluster_by | rows | bytes | owner| retention_time |
|------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------|
| 2018-11-07 | CITY | TESTING   | PUBLIC  | TABLE |     |        |	7 |  1024 | SYSADMIN | 1        |
| 2018-11-07 | COUNTRY  | TESTING   | PUBLIC  | TABLE |     |        |	4 |  1024 | SYSADMIN | 1          |
| 2019-07-22 | ITEMS | TESTING   | PUBLIC  | TABLE |     |      |1|  1024 | SYSADMIN | 1          |
+------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------+

Please note that the above command works even with SQL queries (including filtering and joining against other tables or views). For example, you might try to run:

$ merge2er -T "(SELECT * FROM items) items_select" jdbc:postgresql://HOSTNAME:PORT/DB -U inuser -P inpass -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass 
Loading large datasets could result in a process slowdown.

In section - “JDBC as a Target Connection”, we outlined the possible benefits in making usage of the -B/–batchsize parameter.

In such situations, increasing the batch size (by default set to 1k) could be a valid option to speed up the process. In particular, the batch size represents the maximum amount of writing operations executed in a single batch and, consequently, reduces the number of packets sent across the connection.

In this example we copied a Snowflake table (7.3 M rows, for a total of 130.6MB) to a local postgres database.

$ merge2er -T S_BIKERIDE jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -U inuser -P inpass  -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass

The process took 213 seconds to complete:

14:22:35.567 INFO  Finished successfully in 213020 milliseconds

We then tried to increase the Batch size:

$ merge2er -T S_BIKERIDE -B 100000 jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -U inuser -P inpass  -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass

In this case the run took 179 seconds:

14:22:38.128  INFO  Finished successfully in 179078 milliseconds

Load text files to database

Merge2er can also be used to import tsv, csv or psv to a target database. For instance:

$ merge2er --has-header false sample.csv  -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass
# input
      path:       samplecsv.csv
      format:     csv
      has-header: false

# output
      path:       jdbc:postgresql://HOSTNAME:PORT/DB
      user:       outuser
      password:   ***
      format:     jdbc
      savemode:   overwrite

# schema
      job:  16037

16:38:48.623 INFO Initialized in 3431 milliseconds
16:38:48.629 INFO Check Input/Output Source
16:38:48.934 INFO Input is using LocalFileSystem
16:38:48.934 INFO Using jdbc output source
16:38:48.934 INFO Input source: sample.csv
16:38:48.935 INFO Output source: jdbc:postgresql://HOSTNAME:PORT/DB
16:38:48.935 INFO Input data format: csv
16:38:48.936 INFO Output data format: jdbc
16:38:48.938 INFO Start Processing
16:38:53.190 INFO table sample_csv: overwriting
16:38:53.725 INFO Process finished
16:38:53.728 INFO Registering success of job 16037
16:38:54.055 INFO Finished successfully in 8860 milliseconds

# schema
      job:  16037

# statistics
        startup:  3431 ms

Process finished with exit code 0

Checking the output:

~$ psql -U outuser -d DB
Password for user outser:
psql (10.9 (Ubuntu 10.9-0ubuntu0.18.04.1))
Type "help" for help.

DB=# \dt
        List of relations
 Schema |  Name           | Type  |  Owner  
--------+-----------------+-------+---------
 public | sample_csv      | table | outser

Merge2er can also process parquet, avro and orc file formats.

# input
      path:      userdata.parquet
      format:    parquet

# output
      path:      jdbc:postgresql://HOSTNAME:PORT/DB
      user:      outuser
      password:  ***
      format:    jdbc
      savemode:  overwrite

# schema
       job:  16290

13:40:55.515 INFO  Initialized in 2927 milliseconds
13:40:55.524 INFO  Check Input/Output Source
13:40:55.700 INFO  Input is using LocalFileSystem
13:40:55.700 INFO  Using jdbc output source
13:40:55.700 INFO  Input source: userdata.parquet
13:40:55.701 INFO  Output source: jdbc:postgresql://HOSTNAME:PORT/DB
13:40:55.701 INFO  Input data format: parquet
13:40:55.701 INFO  Output data format: jdbc
13:40:55.702 INFO  Start Processing
13:40:58.664 INFO  table userdata_parquet: overwriting
13:40:59.851 INFO  Process finished
13:40:59.855 INFO  Registering success of job 16290
13:41:00.225 INFO  Finished successfully in 7634 milliseconds

# schema
      job:  16290

# statistics
        startup:  2927 ms

Process finished with exit code 0