Merging and Converting Files

This section describes how Flexter has a way of solving small files problem by Hadoop by offering Merging features using merge2er.

When Flexter is used periodically, loading new data and incrementing the same output tables, it always creates new files. This is a common behavior in Hadoop and Spark based systems. The purpose of the Merge2er tool is to solve the so called small files problem and merge large numbers of small files into a small number of larger files. Users can also use this tool to convert output data from one format to another.

Support file systems

1. Local file system( file://)
2. Remote Web Filesystems as input ex: http:// and ftp://
3. Hadoop HDFS Supported File Systems ex: hdfs://
4. Hadoop Supported Cloud Storages ex: AWS s3, Azure Blob Storage aanda Data Lake, Google CLoud Storage and others. 

Supported data formats

1. Parquet, orc, json, csv, tsv, psv, avro...
2. Other formats supported by Spark, the associated JAR files need to be included in SPARK_HOME/jars folder

Common Parameters

Usage: merge2er [Options] [INPUT]
 
  Options:
  --in-schema SCHEMA          Input schema related to hive or jdbc source
  
  --schema-pattern PATTERN    Input schema pattern related to hive or jdbc source: Ex "*word*"

  --table-pattern PATTERN     Input table pattern related to hive or jdbc source: Ex "*word*"
  
  -T, --table TABLE|QUERY     Name of the table (or specified SQL)
 
  --where WHERE               Where clause for filtering tabular files, jdbc or hive tables

  -U, --in-user USER          Input user
                                  
  -P, --in-password PASSWORD  Input password 

  -F, --in-format FORMAT      Input format.
                              - jdbc, parquet, orc, json, csv, tsv, psv, avro...
	
  --has-header                If the input csv/psv/tsv file has a header
                              default: true
	
  # output data
 
  -o, --out OUTPUT             Output location path

  -u, --out-user USER          Output user
                                  
  -p, --out-password PASSWORD  Output password
                              	
  -f, --out-format FORMAT      Output format:
                               - jdbc, parquet, orc, json, csv, tsv, psv, avro...

  -z, --compression COMPRESSION  Compression mode for filesystem output
                                - none, snappy, gzip, bzip2, deflate, lzo, lz4, zlib, xz...
                                default: snappy

  -V, --hive-create            Enable creating hive tables
                                  
  -E, --out-schema SCHEMA      Creating hive or jdbc tables into schema

  -r, --out-prefix PREFIX      Append a prefix in each output table name 

  -y, --out-suffix SUFFIX      Append a suffix in each output table name 


  --rename [IDX=|NAME=]NEW_NAME,* Rename a table or multiple tables by index (1,2,3...) or by its original name

  --constraints                Enables copying the primary and foreign key constraints

  -R, --extra-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                               Extra column(s) to be included in all output tables

  -i, --partition-column <NAME|EXPR>[:DATATYPE] [AUTO|RANDOM|NOW|ID|FILENAME|FILEPATH|=VALUE] [AS ALIAS]
                               Partition column(s) to be included in all output tables

                              
  -S, --savemode SAVEMODE      Save Mode when target table, directory or file exists
                               ex: [e]rror, [a]ppend, [o]verwrite, [i]gnore, [p]rintschema, [w]riteschema
                               default: append
 
  -Y, --out-part PARTITIONS   Number of partitions for writing data

  -B, --batchsize BATCHSIZE   Batch size to write into databases
                              default: 1000

The input data source must conform to the following specifications:

  1. Each table must be included in a folder. Folder name must not contain equals sign or begin with an underscore or dot.

  2. Users can process multiple tables in one go by pointing the input path to the parent folder of those tables. Folders and files (except temporary file which name starts with a dot or underscore) are not allowed to appear at the same level.

Basic command:

# Template
$ merge2er [-s] [-F <input_format>] [--has-header true] [-f <output_format>] [-Y <num of partitions>] [-z <compression>] [-o <output_path>] <input_path>

If the user does not specify the format of the input data, merge2er will try to detect the input data format by inspecting the file’s extension. If the user does not specify the format of the output data, merge2er will use input data format as output data format.

If the user does not specify an output path, merge2er will process data under ‘in-place’ mode. It will generate the new table in a temporary folder then replace the original table.

The user can specify a top level directory containing multiple table folders inside:

$ merge2er /topLevel/databaseFolder

but can also filter tables by path expression:

$ merge2er  /topLevel/databaseFolder/tab*

# Testing merging dataset at /tmp/test

$ merge2er /tmp/test -Y 2
 
INFO Check Input/Output Paths
INFO Input is using LocalFileSystem
INFO Output is using the same file system as Input
INFO The output directory is the same as the input directory, path: /tmp/test
INFO No data format specified. Trying to detect data format by filename extension
INFO Detect Input Data Format
INFO Found [csv] format file at file:/tmp/test/items/part-00001-5fb9f925-c673-4d3b-a7d1-90ff8de16405-c000.csv, using [csv] as input data format!
INFO Input data format: csv
INFO No output data format specified, use input data format as output data format: csv
INFO Start Processing
INFO Replacing table: items at /tmp/test/items
INFO Replacing table: fillings at /tmp/test/fillings
INFO Replacing table: size at /tmp/test/size
INFO Replacing table: topping at /tmp/test/two/topping
INFO Replacing table: batters at /tmp/test/two/batters
INFO Process finished
INFO Registering success of job 9315

Convert Data Format

Merge2er can also be used to convert the data format simply by specifying a different output data format. In this case, we recommend that the user specify an output path that is different from the input path.

# Convert data format
$ merge2er /tmp/test -o /tmp/output -F csv -f json [-Y 2]

Import tables from one database to another

When targeting a hive database or a jdbc connection, Merge2er can load an entire catalog, multiple schemas, multiple tables or even everything from the connection (whether the user does not specify any table):

$ merge2er --in-schema=<> --schema-pattern=<> --table-pattern=<>  [jdbc:...]
For instance, you can export a table from a source database directly to another one, simply specifying both the connection details (url, user and password) and listing the tables to be copied under the -T/–tables parameter.

In the example below we will try to copy a table from the source Postgres database to the target in Snowflake:

$ merge2er -T items  jdbc:postgresql://HOSTNAME:PORT/DB  -U inuser  -P inpass   -o jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -u outuser -p outpass

13:14:12.191 INFO  Initialized in 3142 milliseconds
13:14:12.198 INFO  Check Input/Output Source
13:14:12.198 INFO  Using jdbc input source
13:14:12.198 INFO  Using jdbc output source
13:14:12.198 INFO  Input source: jdbc:postgresql://HOSTNAME:PORT/DB
13:14:12.198 INFO  Output source: jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA
13:14:12.198 INFO  Input data format: jdbc
13:14:12.199 INFO  Output data format: jdbc
13:14:12.199 INFO  Start Processing
13:14:14.257 INFO  table items: overwriting
13:14:18.954 INFO  Process finished
13:14:18.957 INFO  Registering success of job 16270
13:14:19.281 INFO  Finished successfully in 10229 milliseconds

# schema
      job:  16270

# statistics
        startup:  3142 ms

Process finished with exit code 0
Checking the result:

OUTUSER#[email protected]> show tables;
+------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------+
| created_on | name | database_name | schema_name | kind  | comment | cluster_by | rows | bytes | owner| retention_time |
|------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------|
| 2018-11-07 | CITY | TESTING   | PUBLIC  | TABLE |     |        |	7 |  1024 | SYSADMIN | 1        |
| 2018-11-07 | COUNTRY  | TESTING   | PUBLIC  | TABLE |     |        |	4 |  1024 | SYSADMIN | 1          |
| 2019-07-22 | ITEMS | TESTING   | PUBLIC  | TABLE |     |      |1|  1024 | SYSADMIN | 1          |
+------------+----------+---------------+-------------+-------+---------+------------+------+-------+----------+----------------+

Please note that the above command works even with SQL queries (including filtering and joining against other tables or views). For example, you might try to run:

$ merge2er -T "(SELECT * FROM items) items_select" jdbc:postgresql://HOSTNAME:PORT/DB -U inuser -P inpass -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass 
Loading large datasets could result in a process slowdown.

In section - “JDBC as a Target Connection”, we outlined the possible benefits in making usage of the -B/–batchsize parameter.

In such situations, increasing the batch size (by default set to 1k) could be a valid option to speed up the process. In particular, the batch size represents the maximum amount of writing operations executed in a single batch and, consequently, reduces the number of packets sent across the connection.

In this example we copied a Snowflake table (7.3 M rows, for a total of 130.6MB) to a local postgres database.

$ merge2er -T S_BIKERIDE jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -U inuser -P inpass  -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass

The process took 213 seconds to complete:

14:22:35.567 INFO  Finished successfully in 213020 milliseconds

We then tried to increase the Batch size:

$ merge2er -T S_BIKERIDE -B 100000 jdbc:snowflake://LOCATION.snowflakecomputing.com/?warehouse=WAREHOUSE&db=DB&schema=SCHEMA -U inuser -P inpass  -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass

In this case the run took 179 seconds:

14:22:38.128  INFO  Finished successfully in 179078 milliseconds

Load text files to database

Merge2er can also be used to import tsv, csv or psv to a target database. For instance:

$ merge2er --has-header false sample.csv  -o jdbc:postgresql://HOSTNAME:PORT/DB -u outuser -p outpass
# input
      path:       samplecsv.csv
      format:     csv
      has-header: false

# output
      path:       jdbc:postgresql://HOSTNAME:PORT/DB
      user:       outuser
      password:   ***
      format:     jdbc
      savemode:   overwrite

# schema
      job:  16037

16:38:48.623 INFO Initialized in 3431 milliseconds
16:38:48.629 INFO Check Input/Output Source
16:38:48.934 INFO Input is using LocalFileSystem
16:38:48.934 INFO Using jdbc output source
16:38:48.934 INFO Input source: sample.csv
16:38:48.935 INFO Output source: jdbc:postgresql://HOSTNAME:PORT/DB
16:38:48.935 INFO Input data format: csv
16:38:48.936 INFO Output data format: jdbc
16:38:48.938 INFO Start Processing
16:38:53.190 INFO table sample_csv: overwriting
16:38:53.725 INFO Process finished
16:38:53.728 INFO Registering success of job 16037
16:38:54.055 INFO Finished successfully in 8860 milliseconds

# schema
      job:  16037

# statistics
        startup:  3431 ms

Process finished with exit code 0

Checking the output:

~$ psql -U outuser -d DB
Password for user outser:
psql (10.9 (Ubuntu 10.9-0ubuntu0.18.04.1))
Type "help" for help.

DB=# \dt
        List of relations
 Schema |  Name           | Type  |  Owner  
--------+-----------------+-------+---------
 public | sample_csv      | table | outser

Merge2er can also process parquet, avro and orc file formats.

# input
      path:      userdata.parquet
      format:    parquet

# output
      path:      jdbc:postgresql://HOSTNAME:PORT/DB
      user:      outuser
      password:  ***
      format:    jdbc
      savemode:  overwrite

# schema
       job:  16290

13:40:55.515 INFO  Initialized in 2927 milliseconds
13:40:55.524 INFO  Check Input/Output Source
13:40:55.700 INFO  Input is using LocalFileSystem
13:40:55.700 INFO  Using jdbc output source
13:40:55.700 INFO  Input source: userdata.parquet
13:40:55.701 INFO  Output source: jdbc:postgresql://HOSTNAME:PORT/DB
13:40:55.701 INFO  Input data format: parquet
13:40:55.701 INFO  Output data format: jdbc
13:40:55.702 INFO  Start Processing
13:40:58.664 INFO  table userdata_parquet: overwriting
13:40:59.851 INFO  Process finished
13:40:59.855 INFO  Registering success of job 16290
13:41:00.225 INFO  Finished successfully in 7634 milliseconds

# schema
      job:  16290

# statistics
        startup:  2927 ms

Process finished with exit code 0