Skip to main content

Basic Flow Execution

import requests

API_KEY = "YOUR_API_KEY"
FLOW_ID = "your-flow-id"

response = requests.post(
    f"https://catalogapi.rastro.ai/api/public/workflows/{FLOW_ID}/execute",
    headers={"Authorization": f"Bearer {API_KEY}"},
    json={
        "input": [
            {"sku": "A1", "title": "Product A", "price": 29.99},
            {"sku": "A2", "title": "Product B", "price": 39.99},
            {"sku": "A3", "title": "Product C", "price": 49.99}
        ]
    }
)

result = response.json()
print(f"Run ID: {result['workflow_run_id']}")

Polling for Results

import requests
import time

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://catalogapi.rastro.ai/api"
FLOW_ID = "your-flow-id"

def run_flow(items):
    # Start the flow
    response = requests.post(
        f"{BASE_URL}/public/workflows/{FLOW_ID}/execute",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={"input": items}
    )
    run_id = response.json()["workflow_run_id"]
    print(f"Flow started: {run_id}")

    # Poll for results
    while True:
        status = requests.get(
            f"{BASE_URL}/public/workflows/runs/{run_id}",
            headers={"Authorization": f"Bearer {API_KEY}"}
        ).json()

        if status["status"] == "completed":
            return status["results"]["data"]
        elif status["status"] == "failed":
            raise Exception("Flow failed")

        print(f"Progress: {status.get('progress', 0) * 100:.0f}%")
        time.sleep(5)

# Example usage
items = [
    {"sku": "A1", "title": "Product A"},
    {"sku": "A2", "title": "Product B"},
    {"sku": "A3", "title": "Product C"}
]

results = run_flow(items)
for item in results:
    print(item)

Full Python Client

import requests
import time

class FlowsClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://catalogapi.rastro.ai/api"

    def _headers(self):
        return {"Authorization": f"Bearer {self.api_key}"}

    def execute(self, flow_id, items):
        """Start a flow execution."""
        response = requests.post(
            f"{self.base_url}/public/workflows/{flow_id}/execute",
            headers=self._headers(),
            json={"input": items}
        )
        return response.json()

    def get_status(self, run_id):
        """Get the status of a flow run."""
        response = requests.get(
            f"{self.base_url}/public/workflows/runs/{run_id}",
            headers=self._headers()
        )
        return response.json()

    def execute_and_wait(self, flow_id, items, poll_interval=5, on_progress=None):
        """Execute a flow and wait for results."""
        result = self.execute(flow_id, items)
        run_id = result["workflow_run_id"]

        while True:
            status = self.get_status(run_id)

            if status["status"] == "completed":
                return status["results"]["data"]
            elif status["status"] == "failed":
                raise Exception(f"Flow failed: {status.get('error', 'Unknown error')}")

            if on_progress:
                on_progress(status.get("progress", 0))

            time.sleep(poll_interval)


# Example usage
client = FlowsClient("YOUR_API_KEY")

# Execute and wait
results = client.execute_and_wait(
    flow_id="your-flow-id",
    items=[
        {"sku": "A1", "title": "Product A"},
        {"sku": "A2", "title": "Product B"}
    ],
    on_progress=lambda p: print(f"Progress: {p * 100:.0f}%")
)

for item in results:
    print(item)

Full TypeScript Client

const API_KEY = "YOUR_API_KEY";
const BASE_URL = "https://catalogapi.rastro.ai/api";

class FlowsClient {
  private apiKey: string;
  private baseUrl: string;

  constructor(apiKey: string) {
    this.apiKey = apiKey;
    this.baseUrl = BASE_URL;
  }

  private headers() {
    return {
      "Authorization": `Bearer ${this.apiKey}`,
      "Content-Type": "application/json"
    };
  }

  async execute(flowId: string, items: object[]) {
    const response = await fetch(`${this.baseUrl}/public/workflows/${flowId}/execute`, {
      method: "POST",
      headers: this.headers(),
      body: JSON.stringify({ input: items })
    });
    return response.json();
  }

  async getStatus(runId: string) {
    const response = await fetch(`${this.baseUrl}/public/workflows/runs/${runId}`, {
      headers: { "Authorization": `Bearer ${this.apiKey}` }
    });
    return response.json();
  }

  async executeAndWait(
    flowId: string,
    items: object[],
    pollInterval = 5000,
    onProgress?: (progress: number) => void
  ) {
    const result = await this.execute(flowId, items);
    const runId = result.workflow_run_id;

    while (true) {
      const status = await this.getStatus(runId);

      if (status.status === "completed") {
        return status.results.data;
      } else if (status.status === "failed") {
        throw new Error(`Flow failed: ${status.error || "Unknown error"}`);
      }

      if (onProgress) {
        onProgress(status.progress || 0);
      }

      await new Promise(r => setTimeout(r, pollInterval));
    }
  }
}

// Example usage
const client = new FlowsClient("YOUR_API_KEY");

const results = await client.executeAndWait(
  "your-flow-id",
  [
    { sku: "A1", title: "Product A" },
    { sku: "A2", title: "Product B" }
  ],
  5000,
  (p) => console.log(`Progress: ${p * 100}%`)
);

console.log(results);