Posts AI + Big Data - How to Create an AI Agent that won't Steal Your Job (Probably)
Post
Cancel

AI + Big Data - How to Create an AI Agent that won't Steal Your Job (Probably)

Welcome to the world of AI and Big Data, where humans and machines work together to unravel the mysteries hidden within vast amounts of information. But don’t worry, we’re not here to replace you with an all-knowing AI agent (at least, not yet).

In this blog I’ll show you how to create a powerful data analysis AI agent using PySpark and langchain. With PySpark, a Python-based cluster computing framework, you can easily build a scalable data processing and analysis pipeline to handle large datasets. In this post, I’ll guide you through the process of creating an langchain agent that using large language model (we will use OpenAI llm) can analyze big data with PySpark depending on the user input. Let’s get started!

What the hell is LangChain? LangChain is a framework that facilitates the development of applications powered by language models. It focuses on enabling language models to interact with data sources and their environment. The framework consists of various modules that serve as building blocks for language model-powered applications. These modules include Models, Prompts (for prompt management and optimization), Memory (to persist state between calls), Indexes (for working with external data), Chains (structured sequences of calls), Agents (Chains with high-level directives), Callbacks (for logging and streaming intermediate steps), and more. The framework also integrates with various language models, systems, and products.

LangChain Agent

LangChain offer a possibility to create “Action Agents”: these agents decide an action to take and choose that next action one step at a time. So, following how it works:

  • Some user input is received
  • The agent decides which tool to use, and what the input to that tool should be
  • That tool is then called with that tool input, and an observation is recorded
  • That history of tool, tool input, and observation is passed back into the agent, and it decides what step to take next
  • Repeat until the agent decides it no longer needs to use a tool, and then it responds directly to the user

Tools

Tools are the actions an agent can take. What tools you give an agent highly depend on what you want the agent to do. Tool is composed by:

  • Name: The name of the tool
  • Run Function: The function that execute the tool
  • Description: The description of the tool, is useful for the agent to decide when to use it and how.

Agent

Agent is where the logic of the application lives. Agents expose an interface that takes in user input along with a list of previous steps the agent has taken, and returns either an AgentAction or AgentFinish.

AgentAction corresponds to the tool to use and the input to that tool.

AgentFinish means the agent is done, and has information around what to return to the user.

Another important concept of the Agent is:

  • PromptTemplate: It is responsible for taking the user input and previous steps and constructing a prompt to send to the language model
  • Language Model (llm): this takes the prompt constructed by the PromptTemplate and returns some output

Let’s see Agent in action:

First we need to install some packages and set the OpenAI token

1
2
3
pip install langchain
pip install openai
pip install wikipedia
1
2
import os
os.environ["OPENAI_API_KEY"] = "<YOUR OPENAI TOKEN>"

Now, we can create the agent with 2 tools (wikipedia for query Wikipedia and requests for query the web):

1
2
3
4
5
6
7
8
from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.agents import AgentType
from langchain.llms import OpenAI

llm = OpenAI(temperature=0)
tools = load_tools(["wikipedia","requests"], llm=llm)
agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

And then we can query the agent:

1
agent.run("Who is the italian prime minister?")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
> Entering new AgentExecutor chain...
 I need to find out who the current Italian Prime Minister is.
Action: Wikipedia
Action Input: Italian Prime Minister
Observation: Page: Prime Minister of Italy
Summary: The prime minister of Italy,  officially the president of the Council of Ministers (Italian: Presidente del Consiglio dei Ministri), is the head of government of the Italian Republic. The office of president of the Council of Ministers is established by articles 92–96 of the Constitution of Italy; the president of the Council of Ministers is appointed by the president of the Republic and must have the confidence of the Parliament to stay in office.
Prior to the establishment of the Italian Republic, the position was called President of the Council of Ministers of the Kingdom of Italy (Presidente del Consiglio dei ministri del Regno d'Italia). From 1925 to 1943 during the Fascist regime, the position was transformed into the dictatorial position of Head of the Government, Prime Minister Secretary of State (Capo del Governo, Primo Ministro Segretario di Stato) held by Benito Mussolini, Duce of Fascism, who officially governed on the behalf of the king of Italy. King Victor Emmanuel III removed Mussolini from office in 1943 and the position was restored with Marshal Pietro Badoglio becoming prime minister in 1943, although the original denomination of President of the Council was only restored in 1944, when Ivanoe Bonomi was appointed to the post of prime minister. Alcide De Gasperi became the first prime minister of the Italian Republic in 1946.
The prime minister is the president of the Council of Ministers which holds executive power and the position is similar to those in most other parliamentary systems. The formal Italian order of precedence lists the office as being, ceremonially, the fourth-highest Italian state office after the president and the presiding officers of the two houses of parliament.
Giorgia Meloni is the incumbent prime minister since 22 October 2022.

Page: List of prime ministers of Italy
Summary: The prime minister of Italy is the head of the Council of Ministers, which holds effective executive power in the Italian government. The first officeholder was Camillo Benso, Count of Cavour, who was sworn in on 23 March 1861 after the unification of Italy. Cavour previously served as Prime Minister of the Kingdom of Sardinia, an office from which the Italian prime minister took most of its powers and duties. During the monarchy period, prime ministers were appointed by the king of Italy, as laid down in the Albertine Statute. From 1925 until the fall of his regime in 1943, fascist dictator Benito Mussolini formally modified the office title to "Head of Government, Prime Minister and Secretary of State". From 1861 to 1946, 30 men served as prime ministers, leading 67 governments in total.After the abolition of the Kingdom of Italy in 1946 and the proclamation of the Italian Republic, the office was established by Articles 92 through 96 of the Constitution of Italy. Alcide De Gasperi is the only prime minister who has held this position both in the Kingdom of Italy and in the Republic of Italy. 
The prime minister is appointed by the President of the Republic and must receive a confidence vote by both houses of Parliament: the Chamber of Deputies and the Senate. From 1946 to 2022, in the first 76 years after the creation of the Republic, 30 men served as prime ministers. 
The current officeholder is Giorgia Meloni, who was appointed on 22 October 2022, becoming the first woman to hold this office.The longest-serving prime minister in the history of Italy was Benito Mussolini, who ruled the country from 1922 until 1943; the longest-serving prime minister of the Italian Republic is Silvio Berlusconi, who held the position for more than nine years between 1994 and 2011. The shortest-serving officeholder was Tommaso Tittoni, who served as prime minister for only 16 days in 1905, while the shortest-serving prime minister of the Italian Republic was Fernando Tambroni, who governed for 123 days in 1960.

Page: Deputy Prime Minister of Italy
Summary: The Deputy Prime Minister of Italy, officially Vice-President of the Council of Ministers of the Italian Republic (Italian: Vicepresidente del Consiglio dei ministri della Repubblica Italiana), is a senior member of the Italian Cabinet. Moreover, it is often colloquially known as Vicepremier. The office of the Deputy Prime Minister is not a permanent position, existing only at the discretion of the Prime Minister, who may appoint to other offices to give seniority to a particular Cabinet minister. The office is currently held by Matteo Salvini and Antonio Tajani under Giorgia Meloni's premiership. 
Though they will always have particular responsibilities in government, the Italian deputy prime minister, unlike analogous offices in some other nations, such as a vice-presidency, possesses no special constitutional powers as such, as their powers are solely defined by ordinary law, more specifically, by article 8 of law 400/1998 (Italian: legge 400/1998).
The Deputy Prime Minister (the oldest one in case the office is held by multiple people at once) assumes the duties and powers of the Prime Minister in the latter's absence, illness, or death. In case none was appointed Deputy Prime Minister, it's the oldest Minister who takes the role of the Prime Minister in case the in the latter's absence, illness, or death.
The Deputy Prime Minister does not automatically succeed the Prime Minister in case the latter resigns, as conventionally, in the aftermath of a resignation, the outgoing Prime Minister remains in place to handle day-to-day business until Parliament picks a successor.
In practice, the designation of someone to the role of Deputy Prime Minister may also provide additional practical status within cabinet, enabling the exercise of de facto, if not de jure, power.
In a coalition government, as Enrico Letta Grand coalition government between the Democrats and The People of Freedom, the appointment of the secretary of the smaller party (in the 2014 case, Angelino Alfano, secretary of the PdL) as Deputy Prime Minister is done to give that person more authority within the cabinet to enforce the coalition's agreed-upon agenda.
Thought: I now know the final answer
Final Answer: Giorgia Meloni is the current Prime Minister of Italy, with Matteo Salvini and Antonio Tajani serving as Deputy Prime Ministers.

> Finished chain.
Giorgia Meloni is the current Prime Minister of Italy, with Matteo Salvini and Antonio Tajani serving as Deputy Prime Ministers

Thanks to the option “verbose=True” we can see all the Thoughts of the agents and all the steps.

PySpark Agent

Let’s create our pyspark agent from an LLM and tools

PySpark Tools

First we need to define all the tools of the agent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from typing import Any, Dict, Optional

from pydantic import BaseModel, Extra, Field, root_validator

from langchain.base_language import BaseLanguageModel
from langchain.callbacks.manager import (
    AsyncCallbackManagerForToolRun,
    CallbackManagerForToolRun,
)
from langchain.chains.llm import LLMChain
from langchain.prompts import PromptTemplate
from langchain.tools.base import BaseTool
from pyspark.sql import SparkSession

#function for printing the dataframe in a nice way
def showdf(df) -> str:
    return df._jdf.showString(1000, 1000, False)

class PySparkBaseSQLDatabaseTool(BaseModel):

    spark: SparkSession = Field(exclude=True)

    class Config(BaseTool.Config):
        """Configuration for this pydantic object."""

        arbitrary_types_allowed = True
        extra = Extra.forbid

class PySparkQuerySQLDataBaseTool(PySparkBaseSQLDatabaseTool, BaseTool):

    name = "query_pysparksql_pyspark"
    description = """
    Input to this tool is a detailed and correct Spark SQL query, output is a result from the table.
    If the query is not correct, an error message will be returned.
    If an error is returned, rewrite the query, check the query, and try again.
    """

    def _run(
        self,
        query: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        result = ""
        try:
            result = showdf(self.spark.sql(query))
        except Exception as err:
            #Return the error to the agent
            result = f"Unexpected {err=}, {str(err)} {type(err)=}"
        return result

    async def _arun(
        self,
        query: str,
        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
    ) -> str:
        raise NotImplementedError("Tool does not support async")

class InfoPySparkSQLDatabaseTool(PySparkBaseSQLDatabaseTool, BaseTool):

    name = "schema_pysparksql_db"
    description = """
    Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables.
    Be sure that the tables actually exist by calling list_tables_pysparksql_db first!
    Example Input: "table1, table2, table3"
    """

    def _run(
        self,
        table_names: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        """Get the schema for tables in a comma-separated list."""
        tables = table_names.split(", ")
        result = ""
        for table in tables:
          result += f"Table: {table}\nSchema:\n"
          result += str(showdf(self.spark.sql(f"DESCRIBE Table {table}")))
          result += "Sample rows:\n"
          result += str(showdf(self.spark.sql(f"SELECT * FROM {table} LIMIT 10")))
          result += "\n\n"
        return result

    async def _arun(
        self,
        table_name: str,
        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
    ) -> str:
        raise NotImplementedError("Tool does not support async")

class ListPySparkSQLDatabaseTool(PySparkBaseSQLDatabaseTool, BaseTool):

    name = "list_tables_pysparksql_db"
    description = "Input is an empty string, output is a comma separated list of tables in the database."

    def _run(
        self,
        tool_input: str = "",
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> str:
        res = [row.viewName for row in spark.sql("SHOW VIEWS;").select("viewName").collect()]
        return str(", ".join(res))


    async def _arun(
        self,
        tool_input: str = "",
        run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
    ) -> str:
        raise NotImplementedError("Tool does not support async")

We defined 3 tools:

  • PySparkQuerySQLDataBaseTool: Input of this tool is the Pyspark SQL query and return the output of the query.
  • InfoPySparkSQLDatabaseTool: Take as input a comma separated list of tables and return the schema and samples of the tables.
  • ListPySparkSQLDatabaseTool: print a comma separated list of tables avaliable.

NOTE: Our tools take as input a spark session to run the SQL query.

Then we need to define the toolkit that contains our tools.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from langchain.agents.agent_toolkits.base import BaseToolkit
from typing import List

class PySparkSQLDatabaseToolkit(BaseToolkit):

    spark: SparkSession = Field(exclude=True)

    class Config:
        arbitrary_types_allowed = True

    def get_tools(self) -> List[BaseTool]:
        """Get the tools in the toolkit."""
        return [
            ListPySparkSQLDatabaseTool(spark=self.spark),
            InfoPySparkSQLDatabaseTool(spark=self.spark),
            PySparkQuerySQLDataBaseTool(spark=self.spark),
        ]

PySpark Agent

Now we are ready for defining our PySpark Agent but before we need to define the prompt template of the agent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PYSPARKSQL_PREFIX = """You are an agent designed to interact with a Pyspark SQL database.
Given an input question, create a syntactically correct Pyspark SQL query to run, then look at the results of the query and return the answer.
Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results.
You can order the results by a relevant column to return the most interesting examples in the database.
Never query for all the columns from a specific table, only ask for the relevant columns given the question.
You have access to tools for interacting with the database.
Only use the below tools. Only use the information returned by the below tools to construct your final answer.
If you get an error while executing a query, rewrite the query and try again.
DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database.
If the question does not seem related to the database, just return "I don't know" as the answer.
"""

PYSPARKSQL_SUFFIX = """Begin!
Question: {input}
Thought: I should look at the tables in the database to see what I can query.
{agent_scratchpad}"""

PYSPARKSQL_PREFIX and PYSPARKSQL_SUFFIX are the prefix and suffix of the prompt template of the llm agent ZeroShotAgent which is a predefined langchain agent that determine which tool to use based solely on the tool’s description.

Now we can define the function that will create the agent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from typing import Any, Dict, List, Optional

from langchain.agents.agent import AgentExecutor
from langchain.agents.mrkl.base import ZeroShotAgent
from langchain.agents.mrkl.prompt import FORMAT_INSTRUCTIONS
from langchain.base_language import BaseLanguageModel
from langchain.callbacks.base import BaseCallbackManager
from langchain.chains.llm import LLMChain


def create_pysparksql_agent(
    llm: BaseLanguageModel,
    toolkit: PySparkSQLDatabaseToolkit,
    callback_manager: Optional[BaseCallbackManager] = None,
    prefix: str = PYSPARKSQL_PREFIX,
    suffix: str = PYSPARKSQL_SUFFIX,
    format_instructions: str = FORMAT_INSTRUCTIONS,
    input_variables: Optional[List[str]] = None,
    top_k: int = 10,
    max_iterations: Optional[int] = 15,
    max_execution_time: Optional[float] = None,
    early_stopping_method: str = "force",
    verbose: bool = False,
    agent_executor_kwargs: Optional[Dict[str, Any]] = None,
    **kwargs: Dict[str, Any],
) -> AgentExecutor:
    tools = toolkit.get_tools()
    prefix = prefix.format( top_k=top_k)
    prompt = ZeroShotAgent.create_prompt(
        tools,
        prefix=prefix,
        suffix=suffix,
        format_instructions=format_instructions,
        input_variables=input_variables,
    )
    llm_chain = LLMChain(
        llm=llm,
        prompt=prompt,
        callback_manager=callback_manager,
    )
    tool_names = [tool.name for tool in tools]
    agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names, **kwargs)
    return AgentExecutor.from_agent_and_tools(
        agent=agent,
        tools=tools,
        callback_manager=callback_manager,
        verbose=verbose,
        max_iterations=max_iterations,
        max_execution_time=max_execution_time,
        early_stopping_method=early_stopping_method,
        **(agent_executor_kwargs or {}),
    )

And then we can create the agent:

1
2
3
4
5
6
7
toolkit = PySparkSQLDatabaseToolkit(spark=spark)

agent_executor = create_pysparksql_agent(
    llm=OpenAI(temperature=0),
    toolkit=toolkit,
    verbose=True
)

Basically, our pyspark agent extends the LangChain ZeroShotAgent using our prompt template and our pyspark tools.

Demo

Now the fun part. First let’s load some parquet data (regards users and sales) as temp view:

1
2
3
4
user_path = "/user/xxx/dbacademy/dewd/source/eltwss/raw/users-30m"
spark.read.parquet(user_path).createOrReplaceTempView("users")
sales_path = "/user/xxx/dbacademy/dewd/source/eltwss/raw/sales-30m"
spark.read.parquet(sales_path).createOrReplaceTempView("sales")

Now let’s try some input:

1
agent_executor.run("Describe the sales table")

Output:

1
The sales table has columns order_id, email, transaction_timestamp, total_item_quantity, purchase_revenue_in_usd, unique_items, and items. The items column is an array of structs containing coupon, item_id, item_name, item_revenue_in_usd, price_in_usd, and quantity.

Log of the agent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

> Entering new AgentExecutor chain...
Action: list_tables_pysparksql_db
Action Input: ""
Observation: sales, users
Thought: I should look at the schema of the sales table
Action: schema_pysparksql_db
Action Input: "sales"
Observation: Table: sales
Schema:
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
|               col_name|                                                                                                                  data_type|comment|
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
|               order_id|                                                                                                                     bigint|   null|
|                  email|                                                                                                                     string|   null|
|  transaction_timestamp|                                                                                                                     bigint|   null|
|    total_item_quantity|                                                                                                                     bigint|   null|
|purchase_revenue_in_usd|                                                                                                                     double|   null|
|           unique_items|                                                                                                                     bigint|   null|
|                  items|array<struct<coupon:string,item_id:string,item_name:string,item_revenue_in_usd:double,price_in_usd:double,quantity:bigint>>|   null|
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
Sample rows:
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|order_id|                    email|transaction_timestamp|total_item_quantity|purchase_revenue_in_usd|unique_items|                                                                                                                                      items|
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|  257611|     bowmanmary@casey.com|     1593879956731672|                  2|                 1047.6|           2|[{NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1}, {NEWBED10, P_DOWN_S, Standard Down Pillow, 107.10000000000001, 119.0, 1}]|
|  257585|   carneynathan@yahoo.com|     1593879844080773|                  1|                  850.5|           1|                                                                            [{NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1}]|
|  257859|      brent31@hotmail.com|     1593880860456070|                  1|                 1195.0|           1|                                                                              [{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}]|
|  257627|alan07@robbins-warren.com|     1593880005399706|                  1|                 1195.0|           1|                                                                              [{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}]|
|  257525|     bmorales@mcclure.com|     1593879671197489|                  2|                  754.0|           2|                           [{null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1}, {null, P_DOWN_K, King Down Pillow, 159.0, 159.0, 1}]|
|  257533|     sandra8492@yahoo.com|     1593879697280540|                  1|                  595.0|           1|                                                                                [{null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1}]|
|  257554|       cookanne@yahoo.com|     1593879767619779|                  2|                 1190.0|           1|                                                                               [{null, M_STAN_T, Standard Twin Mattress, 1190.0, 595.0, 2}]|
|  257496|         tara63@yahoo.com|     1593879555291948|                  1|                 1045.0|           1|                                                                             [{null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1}]|
|  257690|  maryjohnson50@gmail.com|     1593880240579453|                  1|                 1525.5|           1|                                                                           [{NEWBED10, M_PREM_F, Premium Full Mattress, 1525.5, 1695.0, 1}]|
|  257540|  nataliesims@hotmail.com|     1593879738111195|                  1|                  940.5|           1|                                                                          [{NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1}]|
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+

Thought: I now know the final answer
Final Answer: The sales table has columns order_id, email, transaction_timestamp, total_item_quantity, purchase_revenue_in_usd, unique_items, and items. The items column is an array of structs containing coupon, item_id, item_name, item_revenue_in_usd, price_in_usd, and quantity.

> Finished chain.
Out[42]: 'The sales table has columns order_id, email, transaction_timestamp, total_item_quantity, purchase_revenue_in_usd, unique_items, and items. The items column is an array of structs containing coupon, item_id, item_name, item_revenue_in_usd, price_in_usd, and quantity.'

Let’s ask some simple query about the data

1
agent_executor.run("Give me the best 3 users that spent the most")

Output:

1
danielcarrillo@smith.com, jessicamoore@harris-moore.com, nharrison@smith.org

Log of the agent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

> Entering new AgentExecutor chain...
Action: list_tables_pysparksql_db
Action Input: ""
Observation: sales, users
Thought: I should look at the schema of the sales and users tables to see what columns I can use.
Action: schema_pysparksql_db
Action Input: "sales, users"
Observation: Table: sales
Schema:
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
|               col_name|                                                                                                                  data_type|comment|
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
|               order_id|                                                                                                                     bigint|   null|
|                  email|                                                                                                                     string|   null|
|  transaction_timestamp|                                                                                                                     bigint|   null|
|    total_item_quantity|                                                                                                                     bigint|   null|
|purchase_revenue_in_usd|                                                                                                                     double|   null|
|           unique_items|                                                                                                                     bigint|   null|
|                  items|array<struct<coupon:string,item_id:string,item_name:string,item_revenue_in_usd:double,price_in_usd:double,quantity:bigint>>|   null|
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
Sample rows:
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|order_id|                    email|transaction_timestamp|total_item_quantity|purchase_revenue_in_usd|unique_items|                                                                                                                                      items|
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|  257611|     bowmanmary@casey.com|     1593879956731672|                  2|                 1047.6|           2|[{NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1}, {NEWBED10, P_DOWN_S, Standard Down Pillow, 107.10000000000001, 119.0, 1}]|
|  257585|   carneynathan@yahoo.com|     1593879844080773|                  1|                  850.5|           1|                                                                            [{NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1}]|
|  257859|      brent31@hotmail.com|     1593880860456070|                  1|                 1195.0|           1|                                                                              [{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}]|
|  257627|alan07@robbins-warren.com|     1593880005399706|                  1|                 1195.0|           1|                                                                              [{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}]|
|  257525|     bmorales@mcclure.com|     1593879671197489|                  2|                  754.0|           2|                           [{null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1}, {null, P_DOWN_K, King Down Pillow, 159.0, 159.0, 1}]|
|  257533|     sandra8492@yahoo.com|     1593879697280540|                  1|                  595.0|           1|                                                                                [{null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1}]|
|  257554|       cookanne@yahoo.com|     1593879767619779|                  2|                 1190.0|           1|                                                                               [{null, M_STAN_T, Standard Twin Mattress, 1190.0, 595.0, 2}]|
|  257496|         tara63@yahoo.com|     1593879555291948|                  1|                 1045.0|           1|                                                                             [{null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1}]|
|  257690|  maryjohnson50@gmail.com|     1593880240579453|                  1|                 1525.5|           1|                                                                           [{NEWBED10, M_PREM_F, Premium Full Mattress, 1525.5, 1695.0, 1}]|
|  257540|  nataliesims@hotmail.com|     1593879738111195|                  1|                  940.5|           1|                                                                          [{NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1}]|
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+


Table: users
Schema:
+--------------------------+---------+-------+
|                  col_name|data_type|comment|
+--------------------------+---------+-------+
|                   user_id|   string|   null|
|user_first_touch_timestamp|   bigint|   null|
|                     email|   string|   null|
+--------------------------+---------+-------+
Sample rows:
+-----------------+--------------------------+---------------------------+
|          user_id|user_first_touch_timestamp|                      email|
+-----------------+--------------------------+---------------------------+
|UA000000107338110|          1593874163848994|      blackburnjohn@gay.biz|
|UA000000107354520|          1593876102569545|   reedjennifer@freeman.com|
|UA000000107359655|          1593876696866893|roberthubbard55@hotmail.com|
|UA000000107362264|          1593876996344195|  amandafrazier@hotmail.com|
|UA000000107362440|          1593877015690335|         laura65@garcia.biz|
|UA000000107363722|          1593877154338249|        tmiller67@yahoo.com|
|UA000000107367850|          1593877599590013|      xmartinez27@gmail.com|
|UA000000107370290|          1593877869468334|        uflynn@anderson.com|
|UA000000107371743|          1593878036347579|   nancyellis34@hotmail.com|
|UA000000107371836|          1593878046932741|        nporter32@gmail.com|
+-----------------+--------------------------+---------------------------+



Thought: I can join the sales and users tables on the email column and get the total purchase revenue for each user.
Action: query_pysparksql_pyspark
Action Input: "SELECT u.email, SUM(s.purchase_revenue_in_usd) AS total_purchase_revenue FROM users u JOIN sales s ON u.email = s.email GROUP BY u.email ORDER BY total_purchase_revenue DESC LIMIT 10"
Observation: +-----------------------------+----------------------+
|                        email|total_purchase_revenue|
+-----------------------------+----------------------+
|     danielcarrillo@smith.com|                1695.0|
|jessicamoore@harris-moore.com|                1195.0|
|          nharrison@smith.org|                1195.0|
|       castrodavid@valdez.com|                1195.0|
|         david75@anderson.org|                 945.0|
|elliottkevin@walker-watts.com|                 945.0|
+-----------------------------+----------------------+

Thought: I now know the best 3 users that spent the most.
Final Answer: danielcarrillo@smith.com, jessicamoore@harris-moore.com, nharrison@smith.org

Not bad. Let’s ask about the items that are a nested fields.

1
agent_executor.run("Give top 10 items selled")

Output:

1
The top 10 items selled are Standard King Mattress, Standard Twin Mattress, Standard Queen Mattress, Standard Full Mattress, Premium Full Mattress, Standard Down Pillow, King Down Pillow, and Premium Twin Mattress.

Log:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Entering new AgentExecutor chain...
Action: list_tables_pysparksql_db
Action Input: ""
Observation: sales, users
Thought: I should look at the schema of the sales table to see what columns I can query.
Action: schema_pysparksql_db
Action Input: "sales"
Observation: Table: sales
Schema:
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
|               col_name|                                                                                                                  data_type|comment|
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
|               order_id|                                                                                                                     bigint|   null|
|                  email|                                                                                                                     string|   null|
|  transaction_timestamp|                                                                                                                     bigint|   null|
|    total_item_quantity|                                                                                                                     bigint|   null|
|purchase_revenue_in_usd|                                                                                                                     double|   null|
|           unique_items|                                                                                                                     bigint|   null|
|                  items|array<struct<coupon:string,item_id:string,item_name:string,item_revenue_in_usd:double,price_in_usd:double,quantity:bigint>>|   null|
+-----------------------+---------------------------------------------------------------------------------------------------------------------------+-------+
Sample rows:
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|order_id|                    email|transaction_timestamp|total_item_quantity|purchase_revenue_in_usd|unique_items|                                                                                                                                      items|
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|  257611|     bowmanmary@casey.com|     1593879956731672|                  2|                 1047.6|           2|[{NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1}, {NEWBED10, P_DOWN_S, Standard Down Pillow, 107.10000000000001, 119.0, 1}]|
|  257585|   carneynathan@yahoo.com|     1593879844080773|                  1|                  850.5|           1|                                                                            [{NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1}]|
|  257859|      brent31@hotmail.com|     1593880860456070|                  1|                 1195.0|           1|                                                                              [{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}]|
|  257627|alan07@robbins-warren.com|     1593880005399706|                  1|                 1195.0|           1|                                                                              [{null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1}]|
|  257525|     bmorales@mcclure.com|     1593879671197489|                  2|                  754.0|           2|                           [{null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1}, {null, P_DOWN_K, King Down Pillow, 159.0, 159.0, 1}]|
|  257533|     sandra8492@yahoo.com|     1593879697280540|                  1|                  595.0|           1|                                                                                [{null, M_STAN_T, Standard Twin Mattress, 595.0, 595.0, 1}]|
|  257554|       cookanne@yahoo.com|     1593879767619779|                  2|                 1190.0|           1|                                                                               [{null, M_STAN_T, Standard Twin Mattress, 1190.0, 595.0, 2}]|
|  257496|         tara63@yahoo.com|     1593879555291948|                  1|                 1045.0|           1|                                                                             [{null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1}]|
|  257690|  maryjohnson50@gmail.com|     1593880240579453|                  1|                 1525.5|           1|                                                                           [{NEWBED10, M_PREM_F, Premium Full Mattress, 1525.5, 1695.0, 1}]|
|  257540|  nataliesims@hotmail.com|     1593879738111195|                  1|                  940.5|           1|                                                                          [{NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1}]|
+--------+-------------------------+---------------------+-------------------+-----------------------+------------+-------------------------------------------------------------------------------------------------------------------------------------------+



Thought: I should query the items column to get the top 10 items selled.
Action: query_pysparksql_pyspark
Action Input: "SELECT items.item_name, SUM(items.quantity) AS total_quantity FROM sales LATERAL VIEW explode(items) t AS items GROUP BY items.item_name ORDER BY total_quantity DESC LIMIT 10"
Observation: Unexpected err=AnalysisException(), Reference 'items.item_name' is ambiguous, could be: sales.items, t.items.; line 1 pos 121 type(err)=<class 'pyspark.sql.utils.AnalysisException'>
Thought: I should add the table name to the query.
Action: query_pysparksql_pyspark
Action Input: "SELECT t.items.item_name, SUM(t.items.quantity) AS total_quantity FROM sales LATERAL VIEW explode(items) t AS items GROUP BY t.items.item_name ORDER BY total_quantity DESC LIMIT 10"
Observation: +-----------------------+--------------+
|              item_name|total_quantity|
+-----------------------+--------------+
| Standard King Mattress|             8|
| Standard Twin Mattress|             7|
|Standard Queen Mattress|             6|
| Standard Full Mattress|             6|
|  Premium Full Mattress|             2|
|   Standard Down Pillow|             1|
|       King Down Pillow|             1|
|  Premium Twin Mattress|             1|
+-----------------------+--------------+

Thought: I now know the final answer.
Final Answer: The top 10 items selled are Standard King Mattress, Standard Twin Mattress, Standard Queen Mattress, Standard Full Mattress, Premium Full Mattress, Standard Down Pillow, King Down Pillow, and Premium Twin Mattress.

Looking at the agent log you can notice that the agent fails with the first attempt and thanks to the error message fix the SQL query and create a correct query.

Final thoughts

We will be replaced by these new AI models?

Well, I think not (for now). Testing a little bit with our agent I noticed that sometimes LLM answer with wrong answers (and wrong SQL) having some Hallucination. Hallucination is a type of error that can occur in language models and occurs when the model generates text that is not supported by the input or the context. To clarify, the model produces output that is not factually correct or logically coherent. However, the OpenAI LLM model used is for all-purpose tasks and maybe a fine-tuned model for SQL tasks could decrease the Hallucination. But, It’s impressive that with some line of codes we can create an AI agent that potentially can query data lake given as input natural language.

Reference link

For more details check out these links:

LangChain

LangChain Tools

LangChain Agents

OpenAI API

Spark

Spark SQL

This post is licensed under CC BY 4.0 by the author.