Generate DDL from SQL

Get the DDL of all schemas and tables used in your SQL:

from flowhigh.model import *
from flowhigh.utils.converter import FlowHighSubmissionClass

# wrapper for identifiers
GLOBAL_WRAP = ""


def transform(_input):
    """
    Map the collected objects to produce the list of DDL commands
    :param _input: the schema/table dictionary
    :return: The list of DDL commands
    """
    output = ""
    input_keys = list(_input.keys())
    if "schemas" in input_keys:
        schemas_ddl = [transform_single_schema(x) for x in _input["schemas"]]
        output = output + "".join(schemas_ddl)
    if "tables" in input_keys:
        tables_ddl = [transform_single_table(x) for x in _input["tables"].values()]
        output = output + "".join(tables_ddl)
    return output


def transform_single_schema(_input):
    """
    Produce the CREATE SCHEMA command
    :param _input: the tables/schema/column dictionary
    :return: the CREATE SCHEMA statement
    """
    return f"CREATE SCHEMA IF NOT EXISTS {_input};\n"


def transform_single_table(_input):
    """
    Produce the CREATE TABLE command
    :param _input: the tables/schema/column dictionary
    :return: the CREATE TABLE statement
    """
    template = "CREATE TABLE IF NOT EXISTS {wrap}{id}{wrap} ( {columns} );\n"
    id_ = _input['id']
    if len(_input['columns']) > 0:
        cols = ", ".join([transform_single_column(x) for x in _input['columns']])
    else:
        cols = "_DUMMY_COL_ int"
    return template.format_map({
        "id": id_,
        "columns": cols,
        "wrap": GLOBAL_WRAP
    })


def transform_single_column(_input):
    """
    Return the Column name + datatype to be replaced in the main DDL template
    :param _input: the table/schema/column dictionary
    :return: Column name + datatype
    """
    _input['wrap'] = GLOBAL_WRAP
    template = "{wrap}{id}{wrap} {type}"
    return template.format_map(_input)


def main_function(node: DBO, curr_db: str = None, curr_schema: str = None):
    """
    Main function to extract schemas/tables and columns
    :param curr_schema: name of the current DBO's parent schema
    :param curr_db: name of the current DBO's parent db
    :param node: the current DBO to visit
    :return:
    """
    if not node:
        return
    # filter DBO of type Table
    if node.type_ == "TABLE":
        table_name = ".".join(filter(None, (curr_db, curr_schema, node.name)))
        if table_name in ddl_input['tables']:
            # extract the list of columns
            d = ddl_input['tables'][table_name]
            d['columns'].extend([{"id": x.name, "type": "varchar(255)"} for x in node.dbo])
        else:
            # extract the list of columns
            d = {"id": table_name, "columns": [{"id": x.name, "type": "varchar(255)"} for x in node.dbo]}
            ddl_input['tables'][table_name] = d
        return
    # filter DBO of type SCHEMA
    if node.type_ == "SCHEMA":
        curr_schema = node.name
        # store the table and its columns for subsequent reuse
        ddl_input['schemas'].append(".".join(filter(None, (curr_db, curr_schema))))
    else:
        # DB type
        curr_db = node.name

    for dbo in node.dbo:
        main_function(dbo, curr_db, curr_schema)


ddl_input = {"tables": {}, "schemas": []}

# The SQL query to be parsed
sql = """%sql%"""

# Initializing the SDK
fh = FlowHighSubmissionClass.from_sql(sql)

# loop over the list of extracted Database Objects
for dbo in fh.get_DBO_hierarchy():
    main_function(dbo)

print(transform(ddl_input))