Synaptic SkillsSynapticSkills
MarketplaceSkill GraphCriar SkillMCP ServerPlataformaEnterprise
v0.1.0-beta
Voltar ao Marketplace
DataAvançadoAuto-Sync

ETL Pipeline Builder

porTHIAGONOMA·THIAGONOMA· v1.3.0 · atualizado em 2026-04-12T22:48:22.338Z
86
Score

Constrói pipelines ETL/ELT com Apache Airflow e dbt: extração de múltiplas fontes, transformação com qualidade de dados e carga em data warehouses. Inclui orquestração, retry e alertas de qualidade.

etleltairflowdbtdata-pipelinedata-qualityorchestration
Linguagens
PythonSQLShell
1.2KStars
132Forks
14.5KUsos
Fork

Documento do Skill

SKILL.mdetl-pipeline-builder/workflow
Passo-a-passo detalhado do skill, referenciando as fases cognitivas:
1
SENSE — Mapear fontes e destino
Identificar APIs, bancos e arquivos de origem
Verificar credentials e rate limits de cada fonte
Estimar volume de dados: GB/dia para escolher estratégia
2
RECOMMEND — Airflow DAG (ELT pattern)
```python
# dags/revenue_pipeline.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta
@dag(
schedule_interval='0 3 * * *', # todo dia às 3h
start_date=days_ago(1),
catchup=False,
default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
tags=['revenue', 'daily'],
)
def revenue_pipeline():
@task
def extract_api(ds: str):
import requests, json
data = requests.get(
f"https://api.example.com/revenue?date={ds}",
headers={"Authorization": f"Bearer {Variable.get('API_TOKEN')}"},
timeout=30,
).json()
# Salvar no GCS raw zone
save_to_gcs(f"raw/revenue/{ds}.json", json.dumps(data))
return len(data)
@task
def validate(rows_extracted: int):
if rows_extracted == 0:
raise ValueError(f"No data extracted — check API availability")
@task
def run_dbt():
import subprocess
result = subprocess.run(
["dbt", "run", "--select", "tag:revenue", "--profiles-dir", "/opt/airflow/dbt"],
capture_output=True, text=True
)
if result.returncode != 0:
raise Exception(f"dbt failed:\n{result.stderr}")
@task
def quality_check():
result = subprocess.run(["dbt", "test", "--select", "tag:revenue"], ...)
if result.returncode != 0:
notify_slack(channel="#data-alerts", message="Quality check failed!")
raise Exception("Quality check failed")
rows = extract_api()
validate(rows) >> run_dbt() >> quality_check()
revenue_pipeline()
```
3
RECOMMEND — dbt models (staging → mart)
```sql
-- models/staging/stg_revenue.sql
{{ config(materialized='incremental', unique_key='transaction_id') }}
SELECT
transaction_id,
PARSE_DATE('%Y-%m-%d', date_str) AS transaction_date,
CAST(amount_str AS FLOAT64) AS amount,
UPPER(TRIM(currency)) AS currency,
CURRENT_TIMESTAMP() AS loaded_at
FROM {{ source('raw', 'revenue') }}
{% if is_incremental() %}
WHERE loaded_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}
```
```yaml
# models/staging/stg_revenue.yml
models:
name: stg_revenue
tests:
not_null: [transaction_id, amount, currency]
unique: [transaction_id]
accepted_values:
column_name: currency
values: ['BRL', 'USD', 'EUR']
```
4
EVALUATE — Rodar tests e verificar idempotência
```bash
dbt test --select tag:revenue # quality checks
dbt run --select tag:revenue --full-refresh # testar full refresh
```
5
REFLECT — Documentar e configurar SLA
```bash
dbt docs generate && dbt docs serve # lineage graph interativo
```
Configurar alerta no Airflow para SLA miss > 30 minutos
Reportar telemetria via mcp-skillschain

Telemetria de Agentes

Execuções
0
total
Taxa de Sucesso
0%
últimos 30d
Latência Média
0.0s
p50
Alucinação
0.0%
detecção
Tokens Entrada
0
avg 0/exec
Tokens Saída
0
avg 0/exec

Uso por Plataforma

Skills Relacionados

Depende deSQL Query Builder
24%
Hebbian Synapse
Composite0.240
w = 0.3·α + 0.5·β + 0.2·γ
91
Depende deDatabase Connector
24%
Hebbian Synapse
Composite0.240
w = 0.3·α + 0.5·β + 0.2·γ
88
Compõe comDatabase Migration
21%
Hebbian Synapse
Composite0.210
w = 0.3·α + 0.5·β + 0.2·γ
84
Compõe comWeb Scraper
21%
Hebbian Synapse
Composite0.210
w = 0.3·α + 0.5·β + 0.2·γ
79
Similar a ←Pandas Data Analyzer
15%
Hebbian Synapse
Composite0.150
w = 0.3·α + 0.5·β + 0.2·γ
87
Co-executedWeb Scraper
48%
Hebbian Synapse
Composite0.478
w = 0.3·α + 0.5·β + 0.2·γ
79
Co-executedPandas Data Analyzer
40%
Hebbian Synapse
Composite0.402
w = 0.3·α + 0.5·β + 0.2·γ
87
Co-executedML Model Trainer
40%
Hebbian Synapse
Composite0.400
w = 0.3·α + 0.5·β + 0.2·γ
85
Co-executedSentiment Analyzer
40%
Hebbian Synapse
Composite0.400
w = 0.3·α + 0.5·β + 0.2·γ
84
Co-executed ←Embedding Generator
40%
Hebbian Synapse
Composite0.400
w = 0.3·α + 0.5·β + 0.2·γ
90
Co-executed ←Data Visualization
40%
Hebbian Synapse
Composite0.400
w = 0.3·α + 0.5·β + 0.2·γ
80
Co-executed ←Database Connector
26%
Hebbian Synapse
Composite0.262
w = 0.3·α + 0.5·β + 0.2·γ
88

Árvore do Skill

ETL Pipeline Builder
etl-pipeline-builder
Fases Cognitivas6
1.SENSE: Percepção
2.CONTEXTUALIZE: Contextualização
3.HYPOTHESIZE: Hipótese
4.RECOMMEND: Recomendação
5.EVALUATE: Avaliação
6.REFLECT: Reflexão
Triggers16
create ETL pipelinecriar pipeline ETLbuild data pipelineairflow DAGdbt pipelinedata extractiondata transformationELT pipelinedata warehouse loadingorchestration pipelinedata ingestionpipeline de dadosextract transform loaddata quality checkbigquery pipelinesnowflake pipeline

Avaliar este Skill

Score Breakdown

⭐Avaliação Humana0%
🤖Sucesso de Agentes0%
🕐Atualidade100%
🔗Saúde de Dependências100%
🕸️Centralidade no Grafo0%
🛡️Segurança50%
CompositeScore = α·Humano + β·Agente + γ·Recência + δ·Deps + ε·Centralidade + ζ·Segurança

Instalação

$ synaptic mcp download etl-pipeline-builder
$ synaptic skills detail etl-pipeline-builder
$ synaptic skills live etl-pipeline-builder

Links

GitHub Repository