Detect anti patterns in your SQL

You can use the SDK functions to find bad practices in your sql:

from flowhigh.model import AntiPattern
from flowhigh.utils import FlowHighSubmissionClass


def print_anti_pattern(anti_pattern: AntiPattern):
    """
    Utility to format and print the anti-pattern's properties
    :param anti_pattern:  the Anti-pattern to be printed out to the console
    :return:
    """
    print(f"Anti pattern: {anti_pattern.name}")
    severity = "Notice" if anti_pattern.severity == 3 else ("Caution" if anti_pattern.severity == 2 else "Warning")
    print(f"Severity: {severity}")
    print(f"Position: {anti_pattern.pos}")
    print(f"Link: {anti_pattern.link}")
    print(f"Performance: {anti_pattern.performance if anti_pattern.performance != ' ' else 'N'}")
    print(f"Readability: {anti_pattern.readability if anti_pattern.readability != ' ' else 'N'}")
    print(f"Correctness: {anti_pattern.correctness if anti_pattern.correctness != ' ' else 'N'}")


# The SQL query to be parsed
sql = """%sql%"""

# Initializing the SDK
fh = FlowHighSubmissionClass.from_sql(sql)

# Get all antipatterns and print them
for ap in fh.get_all_antipatterns():
    print_anti_pattern(ap)

This script connects to Snowflake and scans the query history. It then parses the input query text, checks for anti-patterns, and handles any exceptions. Finally, the results are written to the relevant tables.

import os
import requests
import pandas as pd
import time
import snowflake.connector
from json.decoder import JSONDecodeError
from snowflake.connector.pandas_tools import write_pandas
from flowhigh.utils.converter import FlowHighSubmissionClass

# setting connection parameters
con = snowflake.connector.connect(
    user=os.getenv('SNOWFLAKE_USER'),
    password=os.getenv('SNOWFLAKE_PASSWORD'),
    account=os.getenv('SNOWFLAKE_ACCOUNT'),
    database='DB_AP',
    schema='SCH_AP',
    warehouse='COMPUTE_WH'
)
# establishing connection
cursor = con.cursor()
# fetching queries from query history
select_query = """
    SELECT qh.QUERY_ID, qh.QUERY_TEXT 
      FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh 
 LEFT JOIN DB_AP.SCH_AP.FH_RESPONSE fr 
        ON qh.QUERY_ID = fr.ID 
 LEFT JOIN DB_AP.SCH_AP.ERROR_LOG elt 
        ON qh.QUERY_ID = elt.ID 
     WHERE fr.ID IS NULL AND elt.ID IS NULL
"""
cursor.execute(select_query)
results = cursor.fetchall()
# Creating an empty list to hold the processed, AP and error data
data = []
ap_data = []
error_log = []
# Looping through all the selected rows
for query in results:
    query_id, query_txt = query[0], query[1]
    # try block to get response from FH SDK
    try:
        fh = FlowHighSubmissionClass.from_sql(query_txt)
        fh_response = fh.json_message
        data.append({'ID': query_id, 'QUERY_TEXT': query_txt, 'JSON_FILE': fh_response, 'TIME_STAMP': time.time()})
        # collect antipattern related info (if any)
        ap_data.extend([{'ID': query_id,
                         'Anti_pattern': anti_pattern.name,
                         'Link': anti_pattern.link,
                         'Position': anti_pattern.pos,
                         'AP_Text': [query_txt[int(pos.split("-")[0]): int(pos.split("-")[0]) + int(pos.split("-")[1])] for pos in anti_pattern.pos],
                         'Severity': 'Notice' if anti_pattern.severity == 3 else 'Caution' if anti_pattern.severity == 2 else 'Warning',
                         'Performance': anti_pattern.performance if anti_pattern.performance != ' ' else 'N',
                         'Readability': anti_pattern.readability if anti_pattern.readability != ' ' else 'N',
                         'Correctness': anti_pattern.correctness if anti_pattern.correctness != ' ' else 'N'
                         } for anti_pattern in fh.get_all_antipatterns()])
    # Exception to handle error during query processing
    except (requests.exceptions.HTTPError, JSONDecodeError, requests.exceptions.RequestException) as err:
        error_entry = {'ID': query_id, 'QUERY_TEXT': query_txt, 'ERROR': err}
        error_log.append(error_entry)
# Create the DataFrames
data_df = pd.DataFrame(data)
ap_df = pd.DataFrame(ap_data)
error_df = pd.DataFrame(error_log)
# checking if the data frame is empty or not to skip the insertion
if not ap_df.empty:
    # loading result of APs detected
    write_pandas(con, ap_df, 'FH_ANTIPATTERNS', auto_create_table=True)
if not data_df.empty:
    # loading result to a temporary table
    write_pandas(con, data_df, 'TEMP_LOAD', overwrite=True)
    # query to insert results to main table
    insert_query = "insert into fh_response" \
                   "(select id,query_text,parse_json(json_file),time_stamp from temp_load)"
    cursor.execute(insert_query)
    con.close()
if not error_df.empty:
    # loading details of failed queries
    write_pandas(con, error_df, 'ERROR_LOG')