Imagine you could record every SQL query your entire company has ever run against the data warehouse. Thousands of queries—the ugly ones with SELECT * that bring the cluster to its knees, the elegant ones that join six tables in exactly the right way, the ones written at 2 AM that somehow work but nobody can explain why. Now imagine feeding all of those queries to a program that could read them, not just as text, but as structured knowledge. A program that could say: "Hey, this join pattern shows up in 1,247 queries. Maybe it should be a reusable model."
That's what the text mining system inside dbt-agent does. And over the course of building it, I learned something surprising: the hardest part isn't parsing SQL. It's deciding what's worth remembering.
This article is a deep dive into two Python systems that together span about 10,600 lines of code across 23 files. One reads SQL queries. The other reads AI agent conversations. Both do the same fundamental thing: they turn unstructured text into structured knowledge that makes the next session smarter than the last one.
Before we dive into individual functions, let's see the whole car. There are two mining systems, and they work on completely different inputs but share a design philosophy: extract structured knowledge from unstructured text, and make it available for future decisions.
Both systems share the same fundamental pattern: parse → extract → classify → store. And both face the same fundamental challenge: real-world text is messy, inconsistent, and sometimes broken. The question isn't "can we parse it?" It's "what do we do when we can't?"
Let's zoom all the way in. Every SQL query in the system passes through a single function: parse(), inside a file called parser.py. This is the catalytic converter of the whole operation. If parsing breaks, nothing downstream works.
Here's the actual function. I want you to read it slowly, because there's a design decision hiding in plain sight:
parser.py def parse(self, sql: str) -> ParsedQuery: """Parse with AST, fall back to regex if parsing fails.""" result = ParsedQuery(original_sql=sql, is_valid=False) # Quick pattern detection (works even if parsing fails) self._detect_anti_patterns(sql, result) try: # Full AST parsing with sqlglot ast = sqlglot.parse(sql, dialect=self.dialect)[0] result.is_valid = True # Precise extraction from AST self._extract_tables(ast, result) self._extract_joins(ast, result) self._extract_aggregations(ast, result) # ... 8 extraction methods total except ParseError as e: result.error_message = str(e) # Fallback to regex extraction (lossy but resilient) self._extract_with_regex(sql, result) return result
Did you spot it? The anti-pattern detection runs before the try block. It uses simple regex, not the AST parser. So even if the SQL is so mangled that sqlglot throws its hands up and refuses to parse it, we still get useful information out of it. The function never returns empty-handed.
In practice, this dual-strategy approach achieves 100% coverage: 85% of queries get the full AST treatment, and the remaining 15% get the regex fallback. Zero queries are lost entirely.
If you haven't worked with parsers before, the term AST might sound intimidating. It shouldn't be. An AST is just a tree that represents the structure of code. When you write:
SELECT customer_name, SUM(amount) FROM orders JOIN customers ON orders.customer_id = customers.id GROUP BY customer_name
The AST parser sees this not as a string of characters, but as a tree:
Walking that tree is how the parser extracts joins, tables, aggregations, and everything else. The sqlglot library does the heavy lifting of building the tree. Our code just walks it.
But regex? Regex is the brute-force approach. Instead of understanding the tree, it scans the raw text for patterns. "Does this string contain the word JOIN followed eventually by ON?" It works. It's just less precise.
Now zoom out from the parser. Once every query has been parsed, the system hands them off to four parallel extractors. The most immediately useful one is the Anti-Pattern Analyzer. It answers a simple question: which SQL patterns are making queries slow?
The core algorithm is surprisingly simple. Here it is:
anti_pattern_analyzer.py def compute_impact(self, pattern: str): # Split queries into two groups with_pattern = [q for q in self.queries if self._has_pattern(q, pattern)] without_pattern = [q for q in self.queries if not self._has_pattern(q, pattern)] # Compare medians (not means!) with_median = statistics.median(with_times) without_median = statistics.median(without_times) # Impact multiplier slowdown = with_median / without_median
Two groups. Two medians. One division. That's it. But the choice to use median instead of mean is critical. Here's why:
Imagine you have 100 queries. 99 of them take 2 seconds. One rogue query takes 600 seconds (someone ran a full table scan during their lunch break). The mean execution time is 7.9 seconds—which describes none of the queries accurately. The median is 2 seconds—which describes 99% of them perfectly. When you're trying to measure the impact of an anti-pattern on typical queries, the median gives you truth where the mean gives you noise.
Those numbers are real. NOT IN (subquery) makes the typical query 4.18 times slower than clean SQL. The fix? Replace it with NOT EXISTS—semantically identical, but the query planner handles it completely differently.
What makes this system powerful isn't just finding anti-patterns—plenty of linting tools do that. It's quantifying their impact against your actual workload. It's the difference between "this is bad practice" and "this pattern costs your team 234 queries × 14 extra seconds each = 54 minutes of wasted compute per day."
Now for the subtlest extractor. While the anti-pattern analyzer finds problems, the Join Extractor finds opportunities.
The insight is beautifully simple: if 1,247 queries join fct_transactions to dim_merchant on merchant_id, that join probably deserves its own reusable model. Frequency is a signal for canonicality.
join_extractor.py # Count all join pairs across every query join_counts = Counter() for query in all_queries: for join in query.joins: key = (join.source_table, join.source_column, join.target_table, join.target_column) join_counts[key] += 1 # Canonical = high-frequency joins for join_key, count in join_counts.most_common(): if count >= self.canonical_threshold: # e.g., 50+ mark_canonical(join_key)
Let's break this down for the Python newcomer. Counter() is from Python's standard library—it's a dictionary that counts things. Every time you call counter[key] += 1, it increments the count for that key. If the key doesn't exist yet, it starts at zero. It's the Python equivalent of tally marks on a whiteboard.
The key is a tuple of four strings: source table, source column, target table, target column. This uniquely identifies a join relationship. When we call .most_common(), we get them back sorted from highest count to lowest.
The output is a YAML file called join-registry.yml that looks like this:
canonical_joins: - tables: [edw.fct_transactions, edw.dim_merchant] join_on: source_column: merchant_id target_column: merchant_id frequency: 1247 avg_exec_time: 2.3s
This file becomes an input to future model development. When an AI agent (or a human analyst) needs to join those two tables, it doesn't have to figure out the join key from scratch. The registry already knows.
Let's pull back to see how all four extractors work together inside main.py, the orchestrator. Here's the real data flow, measured from production runs:
There's a detail here that matters: the parse step saves its results to a Parquet cache file. So the first time you run the pipeline, parsing takes 5 seconds. Every subsequent run loads from cache in under a second. The system uses polars instead of pandas for this—Polars is a newer DataFrame library written in Rust that's roughly 5-6x faster on large files.
Let me show you the import pattern, because it illustrates a nice defensive coding technique:
main.py # Check for required dependencies MISSING_DEPS = [] try: import polars as pl except ImportError: MISSING_DEPS.append("polars") try: import sqlglot except ImportError: MISSING_DEPS.append("sqlglot") if MISSING_DEPS: print(f"Missing: {', '.join(MISSING_DEPS)}") print(f"Install: pip install {' '.join(MISSING_DEPS)}") sys.exit(1)
Instead of crashing with a confusing ImportError stack trace, the code checks every dependency, collects what's missing, and prints a single helpful message: "Install these." This is a pattern worth stealing for any Python project.
Now for the second system—and the one with the more surprising story.
ChatOps Analytics doesn't read SQL. It reads the conversations between an AI agent and a human developer. Every time someone uses the dbt-agent (which is a Claude-powered coding assistant), the session gets saved as a Markdown transcript. These transcripts contain: user requests, tool invocations, code that was written, errors that were hit, and—crucially—what actually worked.
The question was: can we automatically extract useful knowledge from these sessions?
The first attempt used micro-pattern extraction. We wrote detectors for specific things: "Find cases where a tool failed and then succeeded on retry." "Find question-answer pairs where the user confirmed the answer." "Find decision points where options were presented."
It worked. Kind of. The extraction yield was 7%—meaning 93% of the useful information in sessions went uncaptured.
Fishing for specific patterns.
High precision, but misses 93% of value.
Capturing work units.
9x more knowledge extracted.
The breakthrough came from a reframing. Instead of searching for patterns, we started searching for tasks.
We were fishing for patterns when we should have been capturing the work itself. A "task unit" is: user directive → agent actions → outcome. That maps to how work actually happens. It's not an exotic pattern—it's the fundamental unit of work.
The task_extractor.py file is the crown jewel. Let me walk through it piece by piece, because it does something elegant with very simple logic.
First, it needs to understand what kind of message a user is sending. Is it a work request? A confirmation? A correction? Here's the classifier:
task_extractor.py def classify_user_message(content: str) -> str: content_lower = content.lower().strip() # Short confirmations if len(content) < 30: if any(w in content_lower for w in ["yes", "ok", "sure", "great", "perfect", "thanks"]): return "confirmation" # Questions if "?" in content and len(content) < 300: return "question" # Feedback/correction if any(w in content_lower for w in ["that's wrong", "not quite", "actually,", "hold on"]): return "feedback" # Work requests (directives) if any(w in content_lower for w in ["please", "can you", "let's", "fix", "add", "create", "run"]): return "directive" # Substantial text defaults to directive if len(content) > 50: return "directive" return "other"
This is not machine learning. There are no embeddings, no neural networks, no models to train. It's keyword matching with length heuristics. And it works remarkably well. Why? Because human-AI conversations follow predictable patterns. Short messages with "yes" or "thanks" are confirmations. Messages with question marks are questions. Messages with action words ("fix", "add", "create") are directives. The classifier doesn't need to be perfect—it needs to be right enough that the task boundaries land in roughly the right places.
Now the main algorithm. This is where task units get assembled from the classified messages:
task_extractor.py — the core loop for turn_idx, (event_idx, turn) in enumerate(turns): role = turn.payload.get("role", "") content = turn.payload.get("content", "") if role == "user": msg_type = classify_user_message(content) if msg_type in ["directive", "question"]: # New task starts! Close the old one. if current_task: current_task.outcome = "redirected" tasks.append(current_task) current_task = TaskUnit( directive=content[:500], directive_type=msg_type, ) elif msg_type == "confirmation": # Task succeeded! current_task.outcome = "confirmed" tasks.append(current_task) current_task = None elif msg_type == "feedback": # User is course-correcting, don't close current_task.user_feedback = content
Read that carefully. It's a state machine. There's a current_task variable that's either None (no active task) or a TaskUnit (work in progress). User messages transition between states:
When the session ends, any remaining open task gets marked "abandoned." Simple. No ML. No training data. Just a state machine that models the natural rhythm of human-AI collaboration.
Let's zoom into the data structure itself, because if you're learning Python, this is a perfect example of how dataclasses work:
task_extractor.py from dataclasses import dataclass, field from typing import Optional, Literal @dataclass class TaskUnit: """A unit of work: directive → actions → outcome.""" task_id: str session_id: str # The request directive: str directive_type: str # What happened tools_used: list[str] = field(default_factory=list) tool_count: int = 0 agent_proposals: list[str] = field(default_factory=list) # How it ended outcome: Literal["confirmed", "redirected", "abandoned", "ongoing"] = "ongoing" user_feedback: Optional[str] = None # Derived value tags: list[str] = field(default_factory=list) complexity: Literal["simple", "moderate", "complex"] = "simple"
If you're new to Python, three things to notice:
1. @dataclass is magic. Without it, you'd have to write an __init__ method that assigns all 11 fields manually: self.task_id = task_id, self.session_id = session_id, and so on. The decorator auto-generates that constructor, plus a readable __repr__ for debugging. One line of decorator saves 20+ lines of boilerplate.
2. field(default_factory=list) avoids a famous Python trap. If you wrote tools_used: list[str] = [], every TaskUnit instance would share the same list object. Adding a tool to one task would magically add it to all tasks. The default_factory=list says "create a new empty list for each instance." It's one of the most common Python gotchas, and using field() is the standard fix.
3. Literal constrains values at the type level. outcome: Literal["confirmed", "redirected", "abandoned", "ongoing"] tells both humans and type checkers that outcome can only be one of those four strings. It's like an enum but lighter. If someone tries to set outcome = "maybe", their IDE will flag it.
The most ambitious part of the system is the trigger suggester. This is where the learning loop actually closes.
Here's the problem it solves. The dbt-agent has about 36 skills—specialized prompts for different tasks like "migrate a SQL pipeline" or "run QA validation." Each skill has trigger phrases: keywords that, when detected in a user message, cause that skill to activate. But users don't always say the right magic words. Someone might type "convert this SQL to dbt" when the migration skill is listening for "migrate" or "legacy script."
The trigger suggester finds these missed connections automatically.
The core technique is n-gram extraction. Let me show you the function, because it's a beautiful example of how a simple algorithm can be surprisingly powerful:
trigger_suggester.py def extract_ngrams(text: str, n_range=(2, 4)) -> list[str]: # Clean: lowercase, remove punctuation text = text.lower() text = re.sub(r'[^\w\s]', ' ', text) words = text.split() ngrams = [] for n in range(n_range[0], n_range[1] + 1): for i in range(len(words) - n + 1): ngram = ' '.join(words[i:i+n]) if len(ngram) > 5 and not _is_stopword_ngram(ngram): ngrams.append(ngram) return ngrams
Let's trace through this with a real example. Given the message: "Can you convert this legacy SQL into a dbt model?"
After cleaning: "can you convert this legacy sql into a dbt model"
The function generates:
| Size | N-grams generated |
|---|---|
| 2-word | "can you", "you convert", "convert this", "this legacy", "legacy sql", "sql into", "dbt model", … |
| 3-word | "can you convert", "you convert this", "convert this legacy", "this legacy sql", "legacy sql into", "into dbt model", … |
| 4-word | "can you convert this", "convert this legacy sql", "this legacy sql into", "legacy sql into dbt", … |
The ones in bold are the valuable ones. "legacy sql", "dbt model", "convert this legacy"—these are phrases that, if they show up frequently in messages where the migration skill should have fired but didn't, become candidates for new trigger phrases.
But raw n-grams are noisy. The system applies two filters:
Stopword filtering removes n-grams that are mostly meaningless words. The function _is_stopword_ngram checks if 60% or more of the words in the n-gram are common words like "the", "is", "a". This kills phrases like "can you the" or "is it a" before they waste anyone's time.
Domain boosting is the clever part. Each n-gram gets a confidence score, and phrases containing domain-relevant terms get a 1.5x multiplier:
trigger_suggester.py # Base confidence from frequency base = min(1.0, count / 10) # Boost domain-relevant phrases if _is_domain_relevant(phrase): confidence = min(1.0, base * 1.5) else: confidence = base * 0.5 # Penalize non-domain
Domain relevance is checked against a set of ~70 terms specific to dbt and SQL: "model", "staging", "incremental", "join", "metric", "pipeline", and so on. If "legacy sql" appears in 5 missed invocations and contains the domain term "sql", its confidence is min(1.0, 0.5 * 1.5) = 0.75. The non-domain phrase "can you convert" with the same frequency gets only 0.5 * 0.5 = 0.25, which falls below the threshold and is discarded.
The full loop looks like this:
It's a system that gets smarter by watching itself fail.
One more function worth examining. The extraction_config.py file defines three preset configurations that control how aggressively the system extracts patterns:
The "window" parameter controls how far ahead the extractor looks when trying to connect related events. With TIGHT, if a tool failure doesn't have a matching retry within 5 events, we assume they're unrelated. With LOOSE, we look up to 10 events ahead—which catches more retries but also produces more false positives (connecting events that aren't actually related).
The pattern is: use LOOSE during research and discovery, use TIGHT in production. LOOSE casts a wide net and shows you what's out there. TIGHT gives you only the patterns you'd bet money on.
Let's zoom all the way out one final time. Here's what these 10,600 lines of Python accomplish together:
Both cycles are compounding loops. The SQL mining system turns query patterns into canonical models, which produce better queries, which feed back into the system. The ChatOps system extracts learnings from sessions, which improve the agent, which produces better sessions to learn from.
The design principles that make it work:
Graceful degradation everywhere. AST fails? Use regex. Event stream is messy? Classify approximately. Missing dependencies? Print a helpful message. The system never stops and says "I can't." It always says "Here's what I could figure out."
Simple algorithms applied well. There's no deep learning in this codebase. Counters, medians, keyword matching, n-grams, state machines. Each one is individually trivial. Together, they're powerful. The 63% extraction yield comes from a 50-line state machine, not a billion-parameter model.
Configurable sensitivity. TIGHT, MEDIUM, LOOSE. Not one size fits all. Research and production have different needs, and the system adapts.
Human in the loop. The trigger suggester doesn't auto-update skills. It suggests. The join registry doesn't auto-create models. It recommends. The system augments human judgment; it doesn't replace it.
If you're learning Python and want to build something meaningful with it: this is the kind of project where simple techniques compound into genuine value. You don't need machine learning. You need Counters, dataclasses, a few regexes, and the discipline to make every failure path return something useful.
The code is there to read, all 10,600 lines of it. And now you have the map.