Get the DDL of all schemas and tables used in your SQL:
from flowhigh.model import *
from flowhigh.utils.converter import FlowHighSubmissionClass
# wrapper for identifiers
GLOBAL_WRAP = ""
def transform(_input):
"""
Map the collected objects to produce the list of DDL commands
:param _input: the schema/table dictionary
:return: The list of DDL commands
"""
output = ""
input_keys = list(_input.keys())
if "schemas" in input_keys:
schemas_ddl = [transform_single_schema(x) for x in _input["schemas"]]
output = output + "".join(schemas_ddl)
if "tables" in input_keys:
tables_ddl = [transform_single_table(x) for x in _input["tables"].values()]
output = output + "".join(tables_ddl)
return output
def transform_single_schema(_input):
"""
Produce the CREATE SCHEMA command
:param _input: the tables/schema/column dictionary
:return: the CREATE SCHEMA statement
"""
return f"CREATE SCHEMA IF NOT EXISTS {_input};\n"
def transform_single_table(_input):
"""
Produce the CREATE TABLE command
:param _input: the tables/schema/column dictionary
:return: the CREATE TABLE statement
"""
template = "CREATE TABLE IF NOT EXISTS {wrap}{id}{wrap} ( {columns} );\n"
id_ = _input['id']
if len(_input['columns']) > 0:
cols = ", ".join([transform_single_column(x) for x in _input['columns']])
else:
cols = "_DUMMY_COL_ int"
return template.format_map({
"id": id_,
"columns": cols,
"wrap": GLOBAL_WRAP
})
def transform_single_column(_input):
"""
Return the Column name + datatype to be replaced in the main DDL template
:param _input: the table/schema/column dictionary
:return: Column name + datatype
"""
_input['wrap'] = GLOBAL_WRAP
template = "{wrap}{id}{wrap} {type}"
return template.format_map(_input)
def main_function(node: DBO, curr_db: str = None, curr_schema: str = None):
"""
Main function to extract schemas/tables and columns
:param curr_schema: name of the current DBO's parent schema
:param curr_db: name of the current DBO's parent db
:param node: the current DBO to visit
:return:
"""
if not node:
return
# filter DBO of type Table
if node.type_ == "TABLE":
table_name = ".".join(filter(None, (curr_db, curr_schema, node.name)))
if table_name in ddl_input['tables']:
# extract the list of columns
d = ddl_input['tables'][table_name]
d['columns'].extend([{"id": x.name, "type": "varchar(255)"} for x in node.dbo])
else:
# extract the list of columns
d = {"id": table_name, "columns": [{"id": x.name, "type": "varchar(255)"} for x in node.dbo]}
ddl_input['tables'][table_name] = d
return
# filter DBO of type SCHEMA
if node.type_ == "SCHEMA":
curr_schema = node.name
# store the table and its columns for subsequent reuse
ddl_input['schemas'].append(".".join(filter(None, (curr_db, curr_schema))))
else:
# DB type
curr_db = node.name
for dbo in node.dbo:
main_function(dbo, curr_db, curr_schema)
ddl_input = {"tables": {}, "schemas": []}
# The SQL query to be parsed
sql = """%sql%"""
# Initializing the SDK
fh = FlowHighSubmissionClass.from_sql(sql)
# loop over the list of extracted Database Objects
for dbo in fh.get_DBO_hierarchy():
main_function(dbo)
print(transform(ddl_input))