top of page

Building Your Own Smart Elasticsearch Agent with LangGraph and Gemini

Do you want to empower users to search your Elasticsearch data with natural language?

Reaching heights with Elastic Agent

Building an intelligent agent that understands what they mean and translates it into a precise Elasticsearch query is the answer! In this blog post, we'll guide you through the steps to create your own smart Elasticsearch agent using LangGraph for orchestration and Google's Gemini model for natural language understanding and query generation. The solution to this challenge lies in the development of an intelligent Elasticsearch Agent. This agent serves as a sophisticated intermediary, bridging the gap between intuitive natural language and Elasticsearch's advanced query capabilities. By translating user intent into precise DSL queries, such an agent effectively democratizes data access, making powerful search and analytical functionalities available to a much broader audience.

The construction of this intelligent system relies on two pivotal technologies: LangGraph and Google Gemini. LangGraph provides a robust orchestration framework, essential for defining the agent's complex, multi-step workflow, thereby ensuring flexibility, scalability, and maintainability. Google Gemini, a powerful large language model (LLM), is tasked with the critical functions of natural language understanding and the subsequent generation of the required Elasticsearch DSL. The synergy of these technologies facilitates the creation of a highly effective and user-friendly search interface

.

Why an Elasticsearch Agent?


Elasticsearch is incredibly powerful, but its query language (DSL) can be complex and intimidating. A well-built agent lets users ask questions in plain English (or any other language!) and get accurate results without needing to learn Elasticsearch's intricacies. Think of it as giving your users a personal Elasticsearch expert. A well-designed Elasticsearch Agent abstracts this complexity, enabling users to interact with the system using plain, natural language, much like conversing with a human expert.


LangGraph: The Orchestration Backbone for Your Elasticsearch Agent


Landgraph the backbone for elastic agent

LangGraph is designed with a graph-based architecture, where "each node in the graph represents a specific task or action like querying a database, calling an LLM, or generating a summary". The edges within this graph define how data or context flows between these nodes. This structural approach fundamentally changes how complex workflows are built, as it allows tasks to be executed in parallel, forked based on specific conditions, and rejoined according to predefined logic, all within a single, visually comprehensible graph.  

While LangChain is a flexible, modular framework primarily focused on integrating large language models (LLMs) into complex workflows , LangGraph represents an evolution for those requiring more intricate, agent-based workflows with complex task dependencies. LangChain excels in "prompt chaining," which involves sequential tasks, and offers seamless integration with external data sources. Its strengths lie in scalability and flexibility, making it suitable for advanced AI applications. In contrast, LangGraph, built upon the foundation of LangChain , provides greater flexibility and control for managing complex workflows. It is particularly well-suited for developers aiming to build dynamic, scalable, and agentic workflows, especially those involving dynamic, multi-agent interactions. It is important to note that LangGraph does not replace LangChain; rather, it extends it, and LangGraph cannot be utilized independently of LangChain.  


LangGraph's stateful, graph-based architecture fundamentally transforms AI agent development from simple sequential processing into adaptive, resilient, and collaborative decision-making systems. The ability to define complex paths, manage internal state, and incorporate cyclical behavior, such as reflection, means the Elasticsearch Agent can dynamically adjust its strategy based on intermediate results or failures. This design allows for re-evaluation, retries, or branching to different paths if, for instance, DSL generation encounters an issue, making the agent more robust and adaptable to unexpected inputs or system states. Furthermore, LangGraph's support for multi-agent systems enables the assignment of specialized roles to different agents (e.g., one for query normalization, another for DSL generation, and a third for summarization). This modularity, coupled with a shared state, facilitates seamless collaboration and context passing among these specialized agents, mirroring the efficiency of a team of human experts. This architecture is crucial for developing highly sophisticated AI copilots or complex decision-making systems, moving beyond a single monolithic LLM call to a distributed, intelligent system.


Gemini AI: Powering Natural Language Understanding and DSL Generation


Gemini logo

Google Gemini's primary function within this architecture is to interpret the user's natural language query and translate it into a structured, executable format specifically, Elasticsearch DSL. This capability is analogous to Google's Gemini in BigQuery, which demonstrates the ability to generate SQL queries from natural language prompts, even allowing for refinement and summary of the generated query. This directly parallels the requirement for an Elasticsearch Agent to generate accurate DSL.

The Gemini API, while defaulting to unstructured text output, can be precisely configured for structured output, such as JSON. This feature is critical for the agent's workflow, given that Elasticsearch DSL is inherently JSON. Two main methods exist for achieving this: configuring a schema directly on the model (recommended for higher quality and determinism) or providing a schema within a text prompt (less deterministic and not recommended if a schema is already configured). Defining a  responseSchema ensures that the model's output strictly adheres to a specific schema, allowing for less post-processing and direct data extraction. This is essential for reliably parsing both the generated DSL and summarized responses. Supported MIME types, including  application/json and text/x.enum , are crucial for controlling the output format.  

Gemini's capability for constrained, structured output, combined with precise prompt engineering, transforms LLMs from general text generators into reliable, programmable components within an automated workflow. The ability to define a responseSchema and enforce JSON output with specific keys means Gemini functions as a predictable, programmable element within the agent's workflow. This is paramount for automation; if the DSL generation step does not consistently produce valid JSON, the entire pipeline will fail. This structured output ensures that downstream components, such as the Elasticsearch client, can reliably parse and utilize the LLM's output without extensive or fragile post-processing. Furthermore, by constraining the output format and content through schemas and precise prompts, the likelihood of the LLM "hallucinating" invalid DSL or irrelevant summary structures is significantly reduced. This fosters trust in the Elasticsearch Agent's outputs, making it suitable for production environments where consistency and accuracy are paramount.


The importance of prompt engineering for effective DSL generation cannot be overstated. Prompt engineering is a technique employed to guide and shape an LLM's responses for specific tasks or queries. It involves crafting well-structured prompts or input instructions to elicit desired outputs. For an Elasticsearch Agent prompts serve not merely as instructions but as a blueprint for Gemini's behavior, dictating how it normalizes queries, generates DSL, and summarizes results. Advanced techniques, such as augmenting prompts with the "grammar of the language in the form of Extended Backus-Naur Form (EBNF)," can further enhance DSL generation accuracy.


The Big Picture: Workflow of Our Agent


Our agent will operate in a series of well-defined steps. We'll use LangGraph to define this workflow, so we can easily modify and extend it in the future:

  1. User Input: The user provides a natural language query (e.g., "Show me recent error logs related to authentication").

  2. Query Normalization: We'll refine the user's query (e.g., correct spelling, remove unnecessary words) to make it more suitable for translation.

  3. DSL Generation: The normalized query is sent to Gemini, which translates it into a valid Elasticsearch DSL query.

  4. Elasticsearch Search: The generated DSL query is executed against your Elasticsearch cluster.

  5. Response Summarization: The raw Elasticsearch results are summarized into a concise and human-readable format.

  6. Final Output: The summarized results are presented to the user.



Let's Get Coding! (Step-by-Step Guide)

Here's how to build your own agent, step by step:


1. Prerequisites:

  • Python Environment: Make sure you have Python 3.7+ installed.

  • Libraries: Install the necessary libraries using pip:

pip install langchain langgraph google-generativeai python-dotenv requests

  • Google Gemini API Key: Obtain an API key from Google Cloud AI Platform and enable the Gemini API. (Free tier available with usage limits).

  • Elasticsearch Cluster: You'll need access to an Elasticsearch cluster.


2. Setting Up Your Project:


  1. Create a Project Directory: Create a new directory for your agent project (e.g., es_agent).

Create a  This file will store your sensitive credentials (API key and Elasticsearch URL). Create a file named .env in your project directory and add the following lines, replacing the placeholders with your actual values:

GEMINI_API_KEY="YOUR_GEMINI_API_KEY"ES_URL="YOUR_ELASTICSEARCH_URL"  # e.g., "https://localhost:9200/_search"

  1. Create  Create three text files in your project directory. These files will contain prompts to the LLM for the different states in the graph: query_normalizer_prompt.txt, es_dsl_formation_prompt.txt, and generic_es_response_summariser.txt.


3. Core Code (es_agent.py):


Create a file named es_agent.py in your project directory. This file will contain the main code for our agent.


import osimport jsonimport requestsimport loggingfrom dotenv import load_dotenvimport google.generativeai as genaifrom langgraph.graph import StateGraph, ENDfrom langchain_core.runnables import RunnableLambda# ─── Logging Configuration ─────────────────────────────────────────────────────logging.basicConfig(  level=logging.INFO,  format="%(asctime)s %(levelname)s %(name)s ─ %(message)s",  datefmt="%Y-%m-%d %H:%M:%S")logger = logging.getLogger(__name__)load_dotenv()# ─── Load environment variables ─────────────────────────────────────────────────GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")ES_ENDPOINT = os.getenv("ES_URL")# ─── Load prompt files ───────────────────────────────────────────────────────────def load_prompt(filename):  base_dir = os.path.dirname(os.path.abspath(__file__))  full_path = os.path.join(base_dir, filename)  try:    with open(full_path, "r") as f:      text = f.read()      logger.info(f"Loaded prompt from '{full_path}' ({len(text)} bytes).")      return text  except Exception as e:    logger.error(f"Error loading prompt file '{full_path}': {e}")    return ""NORMALIZER_PROMPT = load_prompt("query_normalizer_prompt.txt")DSL_PROMPT = load_prompt("es_dsl_formation_prompt.txt")SUMMARY_PROMPT = load_prompt("generic_es_response_summariser.txt")# ─── Configure Gemini ────────────────────────────────────────────────────────────if not GEMINI_API_KEY:  logger.warning("GEMINI_API_KEY is not set in environment; model calls will fail.")genai.configure(api_key=GEMINI_API_KEY)model = genai.GenerativeModel("gemini-2.0-flash")# ─── Shared utility ─────────────────────────────────────────────────────────────def strip_fences(text: str) -> str:  """  Remove markdown json fences (```json ... ```).  """  original = text  text = text.strip()  if text.startswith("```json"):      text = text[7:]  if text.endswith("```"):      text = text[:-3]  stripped = text.strip()  if original != stripped:      logger.debug("Stripped markdown fences from Gemini output."return stripped# ─── Step 1: Normalize Query ─────────────────────────────────────────────────────def normalize_query(state: dict) -> dict:  nl_query = state.get("query", ""logger.info(f"[NormalizeQuery] Received query: '{nl_query}'"try:    prompt = NORMALIZER_PROMPT.format(user_query=nl_query)    logger.debug(f"[NormalizeQuery] Prompt sent to Gemini (length {len(prompt)}).")    response = model.generate_content(prompt)    normalized = strip_fences(response.text)    logger.info(f"[NormalizeQuery] Normalized query: '{normalized}'")    return {**state, "normalized_query": normalized}  except Exception as e:    logger.error(f"[NormalizeQuery] Error during normalization: {e}")    return {**state, "error": f"Normalization failed: {e}"}# ─── Step 2: NL to DSL ────────────────────────────────────────────────────────────def generate_es_dsl(state: dict) -> dict:  normalized = state.get("normalized_query", ""logger.info(f"[GenerateDSL] Converting normalized query to DSL: '{normalized}'"try:    prompt = DSL_PROMPT.format(nl_query=normalized)    logger.debug(f"[GenerateDSL] Prompt sent to Gemini (length {len(prompt)}).")    response = model.generate_content(prompt)    dsl_string = strip_fences(response.text)    logger.debug(f"[GenerateDSL] Gemini returned DSL string: {dsl_string[:100]}{'...' if len(dsl_string) > 100 else ''}")    dsl = json.loads(dsl_string)    logger.info(f"[GenerateDSL] Parsed DSL JSON successfully.")    logger.info(dsl_string)    return {**state, "es_dsl": dsl}  except json.JSONDecodeError as e:    logger.error(f"[GenerateDSL] JSON parsing error: {e}. Raw DSL: {dsl_string}")    return {**state, "error": f"Failed to parse DSL JSON: {e}"except Exception as e:    logger.error(f"[GenerateDSL] Unexpected error: {e}")    return {**state, "error": f"DSL generation failed: {e}"}# ─── Step 3: Search Elasticsearch ────────────────────────────────────────────────def search_elasticsearch(state: dict) -> dict:  es_dsl = state.get("es_dsl"if not es_dsl:    logger.warning("[QueryES] No DSL found in state; skipping Elasticsearch call.")    return {**state, "error": "No DSL to execute."logger.info(f"[QueryES] Sending DSL to Elasticsearch endpoint: {ES_ENDPOINT}")  truncated_payload = json.dumps(es_dsl)[:100] + ("..." if len(json.dumps(es_dsl)) > 100 else "")  logger.debug(f"[QueryES] DSL payload (truncated): {truncated_payload}"try:    res = requests.post(      ES_ENDPOINT,      json=es_dsl,      headers={"Content-Type": "application/json"},      verify=False    )    res.raise_for_status()    es_response = res.json()    hits = es_response.get("hits", {}).get("total", {}).get("value", "unknown")    logger.info(f"[QueryES] Elasticsearch returned {hits} hits.")    return {**state, "es_raw_response": es_response}  except requests.exceptions.RequestException as e:    logger.error(f"[QueryES] HTTP error: {e}")    return {**state, "error": f"Elasticsearch HTTP error: {e}"except json.JSONDecodeError as e:    logger.error(f"[QueryES] JSON decode error on ES response: {e}")    return {**state, "error": f"ES response JSON decode failed: {e}"except Exception as e:    logger.error(f"[QueryES] Unexpected error: {e}")    return {**state, "error": f"Elasticsearch search failed: {e}"}# ─── Step 4: Summarize ────────────────────────────────────────────────────────────def summarize_response(state: dict) -> dict:  if "es_raw_response" not in state:    logger.warning("[Summarize] No ES response found; skipping summarization.")    return state  raw = state["es_raw_response"logger.info("[Summarize] Summarizing Elasticsearch response with Gemini."try:    prompt = SUMMARY_PROMPT.format(      es_response=json.dumps(raw),      nl_query=state.get("query", "")    )    logger.debug(f"[Summarize] Prompt sent to Gemini (length {len(prompt)}).")    response = model.generate_content(prompt)    summary_text = strip_fences(response.text)    logger.debug(f"[Summarize] Raw summary text (truncated): {summary_text[:100]}{'...' if len(summary_text) > 100 else ''}")    try:      summary_json = json.loads(summary_text)      logger.info("[Summarize] Parsed summary JSON successfully.")      return {**state, "summary": summary_json}    except json.JSONDecodeError:      logger.warning("[Summarize] Summary is not valid JSON; returning raw string.")      return {**state, "summary": summary_text}  except Exception as e:    logger.error(f"[Summarize] Error during summarization: {e}")    return {**state, "error": f"Summarization failed: {e}"}# ─── Step 5: Final Output ────────────────────────────────────────────────────────def output_result(state: dict) -> dict:  logger.info("[Output] Generating final output."return json.dumps(state["summary"], indent=2)# ─── Build the graph (use plain dict as the state type) ──────────────────────────graph = StateGraph(dict)graph.add_node("NormalizeQuery", RunnableLambda(normalize_query))graph.add_node("GenerateDSL", RunnableLambda(generate_es_dsl))graph.add_node("QueryES", RunnableLambda(search_elasticsearch))graph.add_node("Summarize", RunnableLambda(summarize_response))graph.add_node("Output", RunnableLambda(output_result))# Define edgesgraph.set_entry_point("NormalizeQuery")graph.add_edge("NormalizeQuery", "GenerateDSL")graph.add_edge("GenerateDSL", "QueryES")graph.add_edge("QueryES", "Summarize")graph.add_edge("Summarize", "Output")graph.add_edge("Output", END)# ─── Compile the graph ───────────────────────────────────────────────────────────graph_executor = graph.compile()# ─── Expose a helper function to run the agent ───────────────────────────────────def es_search_agent(nl_query: str):  """  Invoke the LangGraph agent with a natural-language query string.  """  logger.info(f"[run_query] Invoking agent with query: '{nl_query}'")  result_state = graph_executor.invoke({"query": nl_query})  return json.loads(result_state)

4. Crafting Your Prompts:


This is a critical step. Your prompts guide Gemini's behavior. Spend time crafting effective prompts! Here's a starting point for your prompt files:

query_normalizer_prompt.txt


You are a query normalizer.  Your task is to take a user's search query and normalize it to improve search accuracy.  This includes correcting spelling mistakes, removing irrelevant words (like "please" or "thank you"), and standardizing terminology.User Query: {user_query}Normalized Query:


es_dsl_formation_prompt.txt


You are an expert in Elasticsearch DSL (Domain Specific Language). Your task is to translate a natural language search query into a valid Elasticsearch DSL query in JSON format.  The DSL query should be as specific as possible to accurately reflect the user's intent. You must enclose the valid JSON object with markdown fences.Natural Language Query: {nl_query}Elasticsearch DSL Query:


generic_es_response_summariser.txt


You are a summarization expert. Your task is to summarize the results of an Elasticsearch query in a concise and informative way for a user. You must enclose the valid JSON object with markdown fences. Only answer in JSON format with the key `summary`.Natural Language Query: {nl_query}Elasticsearch Response: {es_response}Summary:

Important Prompting Tips:


  • Be Specific: The more specific your instructions, the better the results.

  • Provide Examples: Consider adding example input-output pairs to your prompts (few-shot learning).

  • Iterate and Refine: Experiment with different prompts and see what works best.

  • Understand Your Data: Tailor the prompts to your specific Elasticsearch data structure.

  • Constrain Output: Ask Gemini to output JSON, and constrain the keys that can be used in the JSON.


5. Running Your Agent:


Now, let's test our agent! Add the following lines to the end of es_agent.py:

if name == "__main__":    query = "Show me recent error logs"    result = es_search_agent(query)    print(json.dumps(result, indent=2))


You should see the agent process the query, generate a DSL query, execute it against Elasticsearch, summarize the results, and print the final output.

6. Securing Your Agent (Critical!)


 A cyberlock for efficient security
  • SSL/TLS: Never disable SSL certificate verification (verify=False) in a production environment! Configure SSL/TLS for your Elasticsearch cluster and ensure your agent verifies the certificate.

  • Authentication/Authorization: Implement proper authentication and authorization mechanisms to control access to your Elasticsearch cluster.

  • Input Sanitization: Sanitize user input to prevent injection attacks.


7. Next Steps and Improvements:


  • More Sophisticated Prompts: Experiment with more complex prompts to handle a wider range of queries.

  • Context Management: Add context management to allow for conversational interactions.

  • Error Handling: Implement more robust error handling to gracefully handle failures.

  • User Interface: Build a user interface (web or command-line) for your agent.

  • Feedback Loop: Collect user feedback to improve the agent's performance over time.

  • Streaming: Stream the responses back to the end-user as they arrive.


Real-world applications for a smart Elasticsearch Agent are diverse and impactful:


  • Observability and Security: In IT operations and cybersecurity, an Elasticsearch Agent proves invaluable. It can "decode error messages, interpret stack traces and error logs to pinpoint root causes," and "identify performance bottlenecks". For security, it facilitates the "collection, storage, analysis of security data," "monitoring and analyzing endpoint security data," and streamlining "threat hunting" operations.  


  • Search Applications: Beyond fundamental search, an Elasticsearch Agent can power advanced functionalities like "semantic search" (understanding query intent), "hybrid search" (combining full-text and vector search), and Retrieval Augmented Generation (RAG) systems by serving as an efficient retrieval engine for LLMs.  


  • Business Intelligence & Analytics: The agent can significantly simplify "Analytics and Reporting" by enabling natural language queries to generate Elasticsearch SQL. This SQL can then be seamlessly integrated with Business Intelligence (BI) tools, simplifying data aggregation and visualization for large datasets.  


  • Customer Service & Support: In customer support scenarios, an agent can rapidly locate pertinent information within extensive knowledge bases by processing natural language questions about past tickets or product documentation, significantly improving response times and accuracy.


Enhancing Your Elasticsearch Agent: Advanced Considerations


Securing Your Elasticsearch Agent (Critical!)


Security is paramount for any production-ready application, especially one interacting with sensitive data. The practice of disabling SSL certificate verification (verify=False) in the provided code is a severe security risk and must never be used in a production environment. This vulnerability exposes data to potential eavesdropping and man-in-the-middle attacks.


Robust security measures for an Elasticsearch Agent include:


SSL/TLS Configuration: It is imperative to configure SSL/TLS for both node-to-node communication within the Elasticsearch cluster and for client connections, enabling xpack.security.transport.ssl.enabled: true and xpack.security.http.ssl.enabled: true.   


Network Security:


IP Filtering: Access to Elasticsearch nodes should be restricted by IP address using xpack.security.transport.filter.allow and xpack.security.http.filter.allow.  Firewall Configuration: Firewalls must be configured to "allow only necessary ports (9200 for HTTP, 9300 for transport)" and to "restrict access to management interfaces". Ideally, Elasticsearch should be "isolated from internet access" unless absolutely necessary.  


Data Protection:


Field-Level Security: Access to sensitive fields within documents (e.g., ssn, credit_card) should be restricted.  

Document-Level Security: Documents should be filtered based on user context, ensuring that users can only view data they are authorized to access.  

Audit Logging: Enabling audit logging (xpack.security.audit.enabled: true) is essential to "track security events" such as "failed authentication attempts," "privilege escalation attempts," and "unusual access patterns".  

A systematic approach to implementing these security measures transforms the Elasticsearch Agent into a trustworthy component within an enterprise architecture. This commitment to robust security is vital for compliance with data protection regulations and for building user confidence, as data security is a non-negotiable aspect of any production system interacting with sensitive information


Conclusion


Building a smart Elasticsearch agent is a rewarding project that can significantly improve the usability of your Elasticsearch data. By combining the power of LangGraph, Google Gemini, and well-crafted prompts, you can create an intelligent assistant that empowers users to search and analyze data with ease. Remember to prioritize security and iterate on your prompts to achieve the best possible results! Now go and build your own!

Comments


bottom of page