Skip to main content

SDK & Integration Examples

Comprehensive integration examples for popular platforms and use cases. Choose your preferred language or platform to get started quickly.

Quick Start Examples

JavaScript/Node.js

const axios = require('axios');

class FluenceAPI {
constructor(apiKey, baseUrl = 'https://api.fluenceinsights.com') {
this.apiKey = apiKey;
this.baseUrl = baseUrl;
this.client = axios.create({
baseURL: baseUrl,
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
timeout: 30000
});
}

async analyzeCRM(data) {
try {
const response = await this.client.post('/crm_basico', data);
return response.data;
} catch (error) {
throw new Error(`API Error: ${error.response?.data?.error?.message || error.message}`);
}
}

async analyzeComplete(data) {
try {
const response = await this.client.post('/crm', data);
return response.data;
} catch (error) {
throw new Error(`API Error: ${error.response?.data?.error?.message || error.message}`);
}
}
}

// Usage
const api = new FluenceAPI(process.env.FLUENCE_API_KEY);
const result = await api.analyzeCRM({
primeiro_nome: 'João',
ultimo_nome: 'Silva',
empresa: 'Microsoft Brasil'
});

Python

import requests
import os
from typing import Dict, Optional

class FluenceAPI:
def __init__(self, api_key: str, base_url: str = 'https://api.fluenceinsights.com'):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'X-API-Key': api_key
})

def analyze_crm(self, data: Dict) -> Optional[Dict]:
"""Analyze CRM data using basic endpoint"""
try:
response = self.session.post(f"{self.base_url}/crm_basico", json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"API Error: {e}")

def analyze_complete(self, data: Dict) -> Optional[Dict]:
"""Analyze CRM data using complete endpoint"""
try:
response = self.session.post(f"{self.base_url}/crm", json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"API Error: {e}")

# Usage
api = FluenceAPI(os.getenv('FLUENCE_API_KEY'))
result = api.analyze_crm({
'primeiro_nome': 'Maria',
'ultimo_nome': 'Santos',
'empresa': 'Google Brasil'
})

cURL Examples

# Basic CRM Analysis
curl -X POST "https://api.fluenceinsights.com/crm_basico" \
-H "Content-Type: application/json" \
-H "X-API-Key: your-api-key-here" \
-d '{
"primeiro_nome": "João",
"ultimo_nome": "Silva",
"empresa": "Microsoft Brasil"
}'

# Complete CRM Analysis
curl -X POST "https://api.fluenceinsights.com/crm" \
-H "Content-Type: application/json" \
-H "X-API-Key: your-api-key-here" \
-d '{
"primeiro_nome": "Maria",
"ultimo_nome": "Santos",
"empresa": "Google Brasil"
}'

# Health Check
curl "https://api.fluenceinsights.com/health" \
-H "X-API-Key: your-api-key-here"

CRM System Integrations

Salesforce Integration

// Salesforce Apex Class
public class FluenceIntegration {
private static final String API_KEY = 'your-api-key';
private static final String BASE_URL = 'https://api.fluenceinsights.com';

@future(callout=true)
public static void enrichContact(String contactId) {
Contact contact = [SELECT FirstName, LastName, Account.Name FROM Contact WHERE Id = :contactId];

// Prepare request data
Map<String, Object> requestData = new Map<String, Object>{
'primeiro_nome' => contact.FirstName,
'ultimo_nome' => contact.LastName,
'empresa' => contact.Account.Name
};

// Make API call
Http http = new Http();
HttpRequest request = new HttpRequest();
request.setEndpoint(BASE_URL + '/crm_basico');
request.setMethod('POST');
request.setHeader('Content-Type', 'application/json');
request.setHeader('X-API-Key', API_KEY);
request.setBody(JSON.serialize(requestData));

HttpResponse response = http.send(request);

if (response.getStatusCode() == 200) {
Map<String, Object> result = (Map<String, Object>)JSON.deserializeUntyped(response.getBody());
Map<String, Object> analysis = (Map<String, Object>)result.get('analise');

// Update contact with analysis data
contact.Fluence_Stakeholder_Type__c = (String)analysis.get('tipo_stakeholder');
contact.Fluence_Location__c = (String)analysis.get('localizacao_detectada');
contact.Fluence_Professional_Interests__c = String.join((List<String>)analysis.get('interesses_profissionais'), '; ');

update contact;
}
}
}

// Trigger to automatically enrich contacts
trigger ContactTrigger on Contact (after insert, after update) {
if (Trigger.isAfter && (Trigger.isInsert || Trigger.isUpdate)) {
for (Contact contact : Trigger.new) {
if (contact.AccountId != null && contact.Fluence_Stakeholder_Type__c == null) {
FluenceIntegration.enrichContact(contact.Id);
}
}
}
}

HubSpot Integration

import requests
import json
from typing import Dict, Optional

class HubSpotFluenceIntegration:
def __init__(self, fluence_api_key: str, hubspot_api_key: str):
self.fluence_api_key = fluence_api_key
self.hubspot_api_key = hubspot_api_key
self.fluence_base_url = 'https://api.fluenceinsights.com'
self.hubspot_base_url = 'https://api.hubapi.com'

def enrich_contact(self, contact_id: int) -> Optional[Dict]:
"""Enrich HubSpot contact with Fluence analysis"""

# Get contact from HubSpot
hubspot_contact = self._get_hubspot_contact(contact_id)
if not hubspot_contact:
return None

# Prepare Fluence request
fluence_data = {
'primeiro_nome': hubspot_contact.get('properties', {}).get('firstname', ''),
'ultimo_nome': hubspot_contact.get('properties', {}).get('lastname', ''),
'empresa': hubspot_contact.get('properties', {}).get('company', '')
}

# Get Fluence analysis
fluence_result = self._get_fluence_analysis(fluence_data)
if not fluence_result:
return None

# Update HubSpot contact with analysis
analysis = fluence_result.get('analise', {})
update_data = {
'properties': {
'fluence_stakeholder_type': analysis.get('tipo_stakeholder', ''),
'fluence_location': analysis.get('localizacao_detectada', ''),
'fluence_professional_interests': '; '.join(analysis.get('interesses_profissionais', [])),
'fluence_analysis_date': fluence_result.get('metadata', {}).get('timestamp', '')
}
}

return self._update_hubspot_contact(contact_id, update_data)

def _get_hubspot_contact(self, contact_id: int) -> Optional[Dict]:
"""Get contact from HubSpot"""
url = f"{self.hubspot_base_url}/crm/v3/objects/contacts/{contact_id}"
headers = {'Authorization': f'Bearer {self.hubspot_api_key}'}

response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
return None

def _get_fluence_analysis(self, data: Dict) -> Optional[Dict]:
"""Get analysis from Fluence API"""
url = f"{self.fluence_base_url}/crm_basico"
headers = {
'Content-Type': 'application/json',
'X-API-Key': self.fluence_api_key
}

response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()
return None

def _update_hubspot_contact(self, contact_id: int, data: Dict) -> Optional[Dict]:
"""Update HubSpot contact"""
url = f"{self.hubspot_base_url}/crm/v3/objects/contacts/{contact_id}"
headers = {
'Authorization': f'Bearer {self.hubspot_api_key}',
'Content-Type': 'application/json'
}

response = requests.patch(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()
return None

# Usage
integration = HubSpotFluenceIntegration(
fluence_api_key='your-fluence-key',
hubspot_api_key='your-hubspot-key'
)

# Enrich a specific contact
result = integration.enrich_contact(12345)

Pipedrive Integration

const axios = require('axios');

class PipedriveFluenceIntegration {
constructor(fluenceApiKey, pipedriveApiKey) {
this.fluenceApiKey = fluenceApiKey;
this.pipedriveApiKey = pipedriveApiKey;
this.fluenceBaseUrl = 'https://api.fluenceinsights.com';
this.pipedriveBaseUrl = 'https://api.pipedrive.com/v1';
}

async enrichPerson(personId) {
try {
// Get person from Pipedrive
const person = await this.getPipedrivePerson(personId);
if (!person) return null;

// Get Fluence analysis
const fluenceData = {
primeiro_nome: person.first_name || '',
ultimo_nome: person.last_name || '',
empresa: person.org_name || ''
};

const fluenceResult = await this.getFluenceAnalysis(fluenceData);
if (!fluenceResult) return null;

// Update Pipedrive person with analysis
const analysis = fluenceResult.analise;
const updateData = {
fluence_stakeholder_type: analysis.tipo_stakeholder,
fluence_location: analysis.localizacao_detectada,
fluence_interests: analysis.interesses_profissionais.join('; '),
fluence_analysis_date: fluenceResult.metadata.timestamp
};

return await this.updatePipedrivePerson(personId, updateData);

} catch (error) {
console.error('Error enriching person:', error);
return null;
}
}

async getPipedrivePerson(personId) {
const response = await axios.get(
`${this.pipedriveBaseUrl}/persons/${personId}`,
{
params: { api_token: this.pipedriveApiKey }
}
);
return response.data.data;
}

async getFluenceAnalysis(data) {
const response = await axios.post(
`${this.fluenceBaseUrl}/crm_basico`,
data,
{
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.fluenceApiKey
}
}
);
return response.data;
}

async updatePipedrivePerson(personId, data) {
const response = await axios.put(
`${this.pipedriveBaseUrl}/persons/${personId}`,
data,
{
params: { api_token: this.pipedriveApiKey }
}
);
return response.data;
}
}

// Usage
const integration = new PipedriveFluenceIntegration(
'your-fluence-key',
'your-pipedrive-key'
);

integration.enrichPerson(12345).then(result => {
console.log('Person enriched:', result);
});

Webhook Implementations

Webhook Receiver (Node.js)

const express = require('express');
const crypto = require('crypto');

const app = express();
app.use(express.json());

class WebhookHandler {
constructor(fluenceApiKey) {
this.fluenceApiKey = fluenceApiKey;
this.fluenceBaseUrl = 'https://api.fluenceinsights.com';
}

async handleNewContact(contactData) {
try {
// Analyze contact with Fluence
const analysis = await this.analyzeContact(contactData);

// Update CRM with analysis results
await this.updateCRM(contactData.id, analysis);

// Send notification
await this.sendNotification(contactData, analysis);

return { success: true, analysis };
} catch (error) {
console.error('Webhook processing error:', error);
return { success: false, error: error.message };
}
}

async analyzeContact(contactData) {
const fluenceData = {
primeiro_nome: contactData.firstName,
ultimo_nome: contactData.lastName,
empresa: contactData.company
};

const response = await fetch(`${this.fluenceBaseUrl}/crm_basico`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.fluenceApiKey
},
body: JSON.stringify(fluenceData)
});

return response.json();
}

async updateCRM(contactId, analysis) {
// Update your CRM system with analysis results
console.log(`Updating contact ${contactId} with analysis:`, analysis);
}

async sendNotification(contactData, analysis) {
// Send notification to sales team
console.log(`New contact analyzed: ${contactData.firstName} ${contactData.lastName}`);
console.log(`Stakeholder type: ${analysis.analise.tipo_stakeholder}`);
}
}

// Webhook endpoint
app.post('/webhook/contact-created', async (req, res) => {
const handler = new WebhookHandler(process.env.FLUENCE_API_KEY);

try {
const result = await handler.handleNewContact(req.body);
res.json(result);
} catch (error) {
res.status(500).json({ error: error.message });
}
});

app.listen(3000, () => {
console.log('Webhook server running on port 3000');
});

Webhook Receiver (Python Flask)

from flask import Flask, request, jsonify
import requests
import os
from typing import Dict, Optional

app = Flask(__name__)

class WebhookHandler:
def __init__(self, fluence_api_key: str):
self.fluence_api_key = fluence_api_key
self.fluence_base_url = 'https://api.fluenceinsights.com'

def handle_new_contact(self, contact_data: Dict) -> Dict:
"""Handle new contact webhook"""
try:
# Analyze contact with Fluence
analysis = self.analyze_contact(contact_data)

# Update CRM with analysis results
self.update_crm(contact_data['id'], analysis)

# Send notification
self.send_notification(contact_data, analysis)

return {'success': True, 'analysis': analysis}
except Exception as e:
print(f'Webhook processing error: {e}')
return {'success': False, 'error': str(e)}

def analyze_contact(self, contact_data: Dict) -> Optional[Dict]:
"""Analyze contact with Fluence API"""
fluence_data = {
'primeiro_nome': contact_data.get('firstName', ''),
'ultimo_nome': contact_data.get('lastName', ''),
'empresa': contact_data.get('company', '')
}

response = requests.post(
f"{self.fluence_base_url}/crm_basico",
headers={
'Content-Type': 'application/json',
'X-API-Key': self.fluence_api_key
},
json=fluence_data
)

if response.status_code == 200:
return response.json()
return None

def update_crm(self, contact_id: str, analysis: Dict):
"""Update CRM system with analysis results"""
print(f"Updating contact {contact_id} with analysis: {analysis}")

def send_notification(self, contact_data: Dict, analysis: Dict):
"""Send notification to sales team"""
print(f"New contact analyzed: {contact_data.get('firstName')} {contact_data.get('lastName')}")
print(f"Stakeholder type: {analysis.get('analise', {}).get('tipo_stakeholder')}")

# Webhook endpoint
@app.route('/webhook/contact-created', methods=['POST'])
def contact_created():
handler = WebhookHandler(os.getenv('FLUENCE_API_KEY'))

try:
result = handler.handle_new_contact(request.json)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
app.run(debug=True, port=3000)

Batch Processing Examples

Batch Contact Analysis (Python)

import asyncio
import aiohttp
import json
from typing import List, Dict
from datetime import datetime

class BatchFluenceProcessor:
def __init__(self, api_key: str, max_concurrent: int = 5):
self.api_key = api_key
self.base_url = 'https://api.fluenceinsights.com'
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)

async def process_contacts(self, contacts: List[Dict]) -> List[Dict]:
"""Process multiple contacts in parallel"""
tasks = []
for contact in contacts:
task = self.process_single_contact(contact)
tasks.append(task)

results = await asyncio.gather(*tasks, return_exceptions=True)
return results

async def process_single_contact(self, contact: Dict) -> Dict:
"""Process a single contact with rate limiting"""
async with self.semaphore:
try:
# Prepare request data
request_data = {
'primeiro_nome': contact.get('firstName', ''),
'ultimo_nome': contact.get('lastName', ''),
'empresa': contact.get('company', '')
}

# Make API request
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/crm_basico",
headers={
'Content-Type': 'application/json',
'X-API-Key': self.api_key
},
json=request_data
) as response:

if response.status == 200:
result = await response.json()
return {
'contact_id': contact.get('id'),
'success': True,
'analysis': result,
'timestamp': datetime.now().isoformat()
}
else:
error_data = await response.json()
return {
'contact_id': contact.get('id'),
'success': False,
'error': error_data.get('error', {}).get('message', 'Unknown error'),
'timestamp': datetime.now().isoformat()
}

except Exception as e:
return {
'contact_id': contact.get('id'),
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}

def save_results(self, results: List[Dict], filename: str = None):
"""Save batch processing results to file"""
if not filename:
filename = f"batch_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

with open(filename, 'w') as f:
json.dump(results, f, indent=2)

# Print summary
successful = sum(1 for r in results if r.get('success'))
failed = len(results) - successful

print(f"Batch processing complete:")
print(f" Total contacts: {len(results)}")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
print(f" Results saved to: {filename}")

# Usage example
async def main():
# Sample contact data
contacts = [
{'id': '1', 'firstName': 'João', 'lastName': 'Silva', 'company': 'Microsoft Brasil'},
{'id': '2', 'firstName': 'Maria', 'lastName': 'Santos', 'company': 'Google Brasil'},
{'id': '3', 'firstName': 'Carlos', 'lastName': 'Oliveira', 'company': 'Nubank'},
# Add more contacts...
]

processor = BatchFluenceProcessor(api_key='your-api-key', max_concurrent=3)
results = await processor.process_contacts(contacts)
processor.save_results(results)

# Run batch processing
if __name__ == "__main__":
asyncio.run(main())

Batch Processing with Progress Tracking (JavaScript)

const axios = require('axios');

class BatchFluenceProcessor {
constructor(apiKey, maxConcurrent = 5) {
this.apiKey = apiKey;
this.baseUrl = 'https://api.fluenceinsights.com';
this.maxConcurrent = maxConcurrent;
this.activeRequests = 0;
this.queue = [];
this.results = [];
}

async processContacts(contacts) {
console.log(`Starting batch processing of ${contacts.length} contacts...`);

const batches = this.createBatches(contacts, this.maxConcurrent);

for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
console.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} contacts)`);

const batchPromises = batch.map(contact => this.processSingleContact(contact));
const batchResults = await Promise.allSettled(batchPromises);

// Add results
batchResults.forEach(result => {
if (result.status === 'fulfilled') {
this.results.push(result.value);
} else {
this.results.push({
success: false,
error: result.reason.message,
timestamp: new Date().toISOString()
});
}
});

// Progress update
const completed = this.results.length;
const successful = this.results.filter(r => r.success).length;
console.log(`Progress: ${completed}/${contacts.length} completed (${successful} successful)`);

// Rate limiting delay
if (i < batches.length - 1) {
await this.delay(1000); // 1 second delay between batches
}
}

return this.results;
}

createBatches(contacts, batchSize) {
const batches = [];
for (let i = 0; i < contacts.length; i += batchSize) {
batches.push(contacts.slice(i, i + batchSize));
}
return batches;
}

async processSingleContact(contact) {
try {
const requestData = {
primeiro_nome: contact.firstName || '',
ultimo_nome: contact.lastName || '',
empresa: contact.company || ''
};

const response = await axios.post(
`${this.baseUrl}/crm_basico`,
requestData,
{
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.apiKey
},
timeout: 30000
}
);

return {
contact_id: contact.id,
success: true,
analysis: response.data,
timestamp: new Date().toISOString()
};

} catch (error) {
return {
contact_id: contact.id,
success: false,
error: error.response?.data?.error?.message || error.message,
timestamp: new Date().toISOString()
};
}
}

delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

saveResults(results, filename = null) {
if (!filename) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
filename = `batch_results_${timestamp}.json`;
}

const fs = require('fs');
fs.writeFileSync(filename, JSON.stringify(results, null, 2));

// Print summary
const successful = results.filter(r => r.success).length;
const failed = results.length - successful;

console.log('\nBatch processing complete:');
console.log(` Total contacts: ${results.length}`);
console.log(` Successful: ${successful}`);
console.log(` Failed: ${failed}`);
console.log(` Results saved to: ${filename}`);
}
}

// Usage example
async function main() {
const contacts = [
{ id: '1', firstName: 'João', lastName: 'Silva', company: 'Microsoft Brasil' },
{ id: '2', firstName: 'Maria', lastName: 'Santos', company: 'Google Brasil' },
{ id: '3', firstName: 'Carlos', lastName: 'Oliveira', company: 'Nubank' },
// Add more contacts...
];

const processor = new BatchFluenceProcessor('your-api-key', 3);
const results = await processor.processContacts(contacts);
processor.saveResults(results);
}

main().catch(console.error);

Error Handling & Retry Logic

Robust API Client with Retry Logic

import requests
import time
import random
from typing import Dict, Optional, Any
from functools import wraps

class FluenceAPIClient:
def __init__(self, api_key: str, base_url: str = 'https://api.fluenceinsights.com'):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'X-API-Key': api_key
})

def retry_on_failure(max_retries=3, backoff_factor=2):
"""Decorator for automatic retry logic"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None

for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except requests.exceptions.RequestException as e:
last_exception = e

if attempt < max_retries:
# Calculate delay with jitter
delay = (backoff_factor ** attempt) + random.uniform(0, 1)
print(f"Request failed (attempt {attempt + 1}/{max_retries + 1}). Retrying in {delay:.1f}s...")
time.sleep(delay)
else:
print(f"Max retries ({max_retries}) exceeded. Last error: {e}")

raise last_exception
return wrapper
return decorator

@retry_on_failure(max_retries=3)
def analyze_crm(self, data: Dict[str, Any]) -> Optional[Dict]:
"""Analyze CRM data with automatic retry"""
response = self.session.post(f"{self.base_url}/crm_basico", json=data, timeout=30)
response.raise_for_status()
return response.json()

@retry_on_failure(max_retries=3)
def analyze_complete(self, data: Dict[str, Any]) -> Optional[Dict]:
"""Analyze CRM data with complete analysis"""
response = self.session.post(f"{self.base_url}/crm", json=data, timeout=30)
response.raise_for_status()
return response.json()

def health_check(self) -> bool:
"""Check API health"""
try:
response = self.session.get(f"{self.base_url}/health", timeout=10)
return response.status_code == 200
except:
return False

# Usage with error handling
def safe_analyze_contact(client: FluenceAPIClient, contact_data: Dict) -> Dict:
"""Safely analyze a contact with comprehensive error handling"""
try:
result = client.analyze_crm(contact_data)
return {
'success': True,
'data': result,
'timestamp': time.time()
}
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
return {'success': False, 'error': 'Authentication failed - check API key'}
elif e.response.status_code == 402:
return {'success': False, 'error': 'Insufficient credits - upgrade plan'}
elif e.response.status_code == 429:
return {'success': False, 'error': 'Rate limit exceeded - try again later'}
else:
return {'success': False, 'error': f'HTTP error {e.response.status_code}'}
except requests.exceptions.Timeout:
return {'success': False, 'error': 'Request timeout'}
except requests.exceptions.ConnectionError:
return {'success': False, 'error': 'Connection error'}
except Exception as e:
return {'success': False, 'error': f'Unexpected error: {str(e)}'}

# Example usage
client = FluenceAPIClient('your-api-key')
contact_data = {
'primeiro_nome': 'João',
'ultimo_nome': 'Silva',
'empresa': 'Microsoft Brasil'
}

result = safe_analyze_contact(client, contact_data)
if result['success']:
print("Analysis successful:", result['data']['analise']['tipo_stakeholder'])
else:
print("Analysis failed:", result['error'])

Support & Resources

Getting Help

Best Practices

  1. Always implement retry logic for production applications
  2. Use environment variables for API keys
  3. Monitor rate limits and implement backoff strategies
  4. Validate input data before sending to API
  5. Log errors for debugging and monitoring
  6. Use appropriate endpoints based on your needs (Basic vs Complete)

Rate Limiting

  • Essentials: 5 req/sec, 500 req/month
  • Professional: 10 req/sec, 1,000 req/month
  • Enterprise: 15 req/sec, 1,000 req/month

Implement exponential backoff for retries and monitor your usage in the dashboard.