Passo-a-passo detalhado do skill, referenciando as fases cognitivas:
1SENSE — Mapear fontes e destino
Identificar APIs, bancos e arquivos de origem
Verificar credentials e rate limits de cada fonte
Estimar volume de dados: GB/dia para escolher estratégia
2RECOMMEND — Airflow DAG (ELT pattern)
```python
# dags/revenue_pipeline.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
@dag(
schedule_interval='0 3 * * *', # todo dia às 3h
start_date=days_ago(1),
catchup=False,
default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
tags=['revenue', 'daily'],
)
def revenue_pipeline():
@task
def extract_api(ds: str):
import requests, json
data = requests.get(
f"https://api.example.com/revenue?date={ds}",
headers={"Authorization": f"Bearer {Variable.get('API_TOKEN')}"},
timeout=30,
).json()
# Salvar no GCS raw zone
save_to_gcs(f"raw/revenue/{ds}.json", json.dumps(data))
return len(data)
@task
def validate(rows_extracted: int):
if rows_extracted == 0:
raise ValueError(f"No data extracted — check API availability")
@task
def run_dbt():
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:revenue", "--profiles-dir", "/opt/airflow/dbt"],
capture_output=True, text=True
)
if result.returncode != 0:
raise Exception(f"dbt failed:\n{result.stderr}")
@task
def quality_check():
result = subprocess.run(["dbt", "test", "--select", "tag:revenue"], ...)
if result.returncode != 0:
notify_slack(channel="#data-alerts", message="Quality check failed!")
raise Exception("Quality check failed")
rows = extract_api()
validate(rows) >> run_dbt() >> quality_check()
revenue_pipeline()
```
3RECOMMEND — dbt models (staging → mart)
```sql
-- models/staging/stg_revenue.sql
{{ config(materialized='incremental', unique_key='transaction_id') }}
SELECT
transaction_id,
PARSE_DATE('%Y-%m-%d', date_str) AS transaction_date,
CAST(amount_str AS FLOAT64) AS amount,
UPPER(TRIM(currency)) AS currency,
CURRENT_TIMESTAMP() AS loaded_at
FROM {{ source('raw', 'revenue') }}
{% if is_incremental() %}
WHERE loaded_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}
```
```yaml
# models/staging/stg_revenue.yml
models:
name: stg_revenue
tests:
not_null: [transaction_id, amount, currency]
unique: [transaction_id]
accepted_values:
column_name: currency
values: ['BRL', 'USD', 'EUR']
```
4EVALUATE — Rodar tests e verificar idempotência
```bash
dbt test --select tag:revenue # quality checks
dbt run --select tag:revenue --full-refresh # testar full refresh
```
5REFLECT — Documentar e configurar SLA
```bash
dbt docs generate && dbt docs serve # lineage graph interativo
```
Configurar alerta no Airflow para SLA miss > 30 minutos
Reportar telemetria via mcp-skillschain