SDK & Integration Examples
Comprehensive integration examples for popular platforms and use cases. Choose your preferred language or platform to get started quickly.
Quick Start Examples
JavaScript/Node.js
const axios = require('axios');
class FluenceAPI {
constructor(apiKey, baseUrl = 'https://api.fluenceinsights.com') {
this.apiKey = apiKey;
this.baseUrl = baseUrl;
this.client = axios.create({
baseURL: baseUrl,
headers: {
'Content-Type': 'application/json',
'X-API-Key': apiKey
},
timeout: 30000
});
}
async analyzeCRM(data) {
try {
const response = await this.client.post('/crm_basico', data);
return response.data;
} catch (error) {
throw new Error(`API Error: ${error.response?.data?.error?.message || error.message}`);
}
}
async analyzeComplete(data) {
try {
const response = await this.client.post('/crm', data);
return response.data;
} catch (error) {
throw new Error(`API Error: ${error.response?.data?.error?.message || error.message}`);
}
}
}
// Usage
const api = new FluenceAPI(process.env.FLUENCE_API_KEY);
const result = await api.analyzeCRM({
primeiro_nome: 'João',
ultimo_nome: 'Silva',
empresa: 'Microsoft Brasil'
});
Python
import requests
import os
from typing import Dict, Optional
class FluenceAPI:
def __init__(self, api_key: str, base_url: str = 'https://api.fluenceinsights.com'):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'X-API-Key': api_key
})
def analyze_crm(self, data: Dict) -> Optional[Dict]:
"""Analyze CRM data using basic endpoint"""
try:
response = self.session.post(f"{self.base_url}/crm_basico", json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"API Error: {e}")
def analyze_complete(self, data: Dict) -> Optional[Dict]:
"""Analyze CRM data using complete endpoint"""
try:
response = self.session.post(f"{self.base_url}/crm", json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"API Error: {e}")
# Usage
api = FluenceAPI(os.getenv('FLUENCE_API_KEY'))
result = api.analyze_crm({
'primeiro_nome': 'Maria',
'ultimo_nome': 'Santos',
'empresa': 'Google Brasil'
})
cURL Examples
# Basic CRM Analysis
curl -X POST "https://api.fluenceinsights.com/crm_basico" \
-H "Content-Type: application/json" \
-H "X-API-Key: your-api-key-here" \
-d '{
"primeiro_nome": "João",
"ultimo_nome": "Silva",
"empresa": "Microsoft Brasil"
}'
# Complete CRM Analysis
curl -X POST "https://api.fluenceinsights.com/crm" \
-H "Content-Type: application/json" \
-H "X-API-Key: your-api-key-here" \
-d '{
"primeiro_nome": "Maria",
"ultimo_nome": "Santos",
"empresa": "Google Brasil"
}'
# Health Check
curl "https://api.fluenceinsights.com/health" \
-H "X-API-Key: your-api-key-here"
CRM System Integrations
Salesforce Integration
// Salesforce Apex Class
public class FluenceIntegration {
private static final String API_KEY = 'your-api-key';
private static final String BASE_URL = 'https://api.fluenceinsights.com';
@future(callout=true)
public static void enrichContact(String contactId) {
Contact contact = [SELECT FirstName, LastName, Account.Name FROM Contact WHERE Id = :contactId];
// Prepare request data
Map<String, Object> requestData = new Map<String, Object>{
'primeiro_nome' => contact.FirstName,
'ultimo_nome' => contact.LastName,
'empresa' => contact.Account.Name
};
// Make API call
Http http = new Http();
HttpRequest request = new HttpRequest();
request.setEndpoint(BASE_URL + '/crm_basico');
request.setMethod('POST');
request.setHeader('Content-Type', 'application/json');
request.setHeader('X-API-Key', API_KEY);
request.setBody(JSON.serialize(requestData));
HttpResponse response = http.send(request);
if (response.getStatusCode() == 200) {
Map<String, Object> result = (Map<String, Object>)JSON.deserializeUntyped(response.getBody());
Map<String, Object> analysis = (Map<String, Object>)result.get('analise');
// Update contact with analysis data
contact.Fluence_Stakeholder_Type__c = (String)analysis.get('tipo_stakeholder');
contact.Fluence_Location__c = (String)analysis.get('localizacao_detectada');
contact.Fluence_Professional_Interests__c = String.join((List<String>)analysis.get('interesses_profissionais'), '; ');
update contact;
}
}
}
// Trigger to automatically enrich contacts
trigger ContactTrigger on Contact (after insert, after update) {
if (Trigger.isAfter && (Trigger.isInsert || Trigger.isUpdate)) {
for (Contact contact : Trigger.new) {
if (contact.AccountId != null && contact.Fluence_Stakeholder_Type__c == null) {
FluenceIntegration.enrichContact(contact.Id);
}
}
}
}
HubSpot Integration
import requests
import json
from typing import Dict, Optional
class HubSpotFluenceIntegration:
def __init__(self, fluence_api_key: str, hubspot_api_key: str):
self.fluence_api_key = fluence_api_key
self.hubspot_api_key = hubspot_api_key
self.fluence_base_url = 'https://api.fluenceinsights.com'
self.hubspot_base_url = 'https://api.hubapi.com'
def enrich_contact(self, contact_id: int) -> Optional[Dict]:
"""Enrich HubSpot contact with Fluence analysis"""
# Get contact from HubSpot
hubspot_contact = self._get_hubspot_contact(contact_id)
if not hubspot_contact:
return None
# Prepare Fluence request
fluence_data = {
'primeiro_nome': hubspot_contact.get('properties', {}).get('firstname', ''),
'ultimo_nome': hubspot_contact.get('properties', {}).get('lastname', ''),
'empresa': hubspot_contact.get('properties', {}).get('company', '')
}
# Get Fluence analysis
fluence_result = self._get_fluence_analysis(fluence_data)
if not fluence_result:
return None
# Update HubSpot contact with analysis
analysis = fluence_result.get('analise', {})
update_data = {
'properties': {
'fluence_stakeholder_type': analysis.get('tipo_stakeholder', ''),
'fluence_location': analysis.get('localizacao_detectada', ''),
'fluence_professional_interests': '; '.join(analysis.get('interesses_profissionais', [])),
'fluence_analysis_date': fluence_result.get('metadata', {}).get('timestamp', '')
}
}
return self._update_hubspot_contact(contact_id, update_data)
def _get_hubspot_contact(self, contact_id: int) -> Optional[Dict]:
"""Get contact from HubSpot"""
url = f"{self.hubspot_base_url}/crm/v3/objects/contacts/{contact_id}"
headers = {'Authorization': f'Bearer {self.hubspot_api_key}'}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
return None
def _get_fluence_analysis(self, data: Dict) -> Optional[Dict]:
"""Get analysis from Fluence API"""
url = f"{self.fluence_base_url}/crm_basico"
headers = {
'Content-Type': 'application/json',
'X-API-Key': self.fluence_api_key
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()
return None
def _update_hubspot_contact(self, contact_id: int, data: Dict) -> Optional[Dict]:
"""Update HubSpot contact"""
url = f"{self.hubspot_base_url}/crm/v3/objects/contacts/{contact_id}"
headers = {
'Authorization': f'Bearer {self.hubspot_api_key}',
'Content-Type': 'application/json'
}
response = requests.patch(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()
return None
# Usage
integration = HubSpotFluenceIntegration(
fluence_api_key='your-fluence-key',
hubspot_api_key='your-hubspot-key'
)
# Enrich a specific contact
result = integration.enrich_contact(12345)
Pipedrive Integration
const axios = require('axios');
class PipedriveFluenceIntegration {
constructor(fluenceApiKey, pipedriveApiKey) {
this.fluenceApiKey = fluenceApiKey;
this.pipedriveApiKey = pipedriveApiKey;
this.fluenceBaseUrl = 'https://api.fluenceinsights.com';
this.pipedriveBaseUrl = 'https://api.pipedrive.com/v1';
}
async enrichPerson(personId) {
try {
// Get person from Pipedrive
const person = await this.getPipedrivePerson(personId);
if (!person) return null;
// Get Fluence analysis
const fluenceData = {
primeiro_nome: person.first_name || '',
ultimo_nome: person.last_name || '',
empresa: person.org_name || ''
};
const fluenceResult = await this.getFluenceAnalysis(fluenceData);
if (!fluenceResult) return null;
// Update Pipedrive person with analysis
const analysis = fluenceResult.analise;
const updateData = {
fluence_stakeholder_type: analysis.tipo_stakeholder,
fluence_location: analysis.localizacao_detectada,
fluence_interests: analysis.interesses_profissionais.join('; '),
fluence_analysis_date: fluenceResult.metadata.timestamp
};
return await this.updatePipedrivePerson(personId, updateData);
} catch (error) {
console.error('Error enriching person:', error);
return null;
}
}
async getPipedrivePerson(personId) {
const response = await axios.get(
`${this.pipedriveBaseUrl}/persons/${personId}`,
{
params: { api_token: this.pipedriveApiKey }
}
);
return response.data.data;
}
async getFluenceAnalysis(data) {
const response = await axios.post(
`${this.fluenceBaseUrl}/crm_basico`,
data,
{
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.fluenceApiKey
}
}
);
return response.data;
}
async updatePipedrivePerson(personId, data) {
const response = await axios.put(
`${this.pipedriveBaseUrl}/persons/${personId}`,
data,
{
params: { api_token: this.pipedriveApiKey }
}
);
return response.data;
}
}
// Usage
const integration = new PipedriveFluenceIntegration(
'your-fluence-key',
'your-pipedrive-key'
);
integration.enrichPerson(12345).then(result => {
console.log('Person enriched:', result);
});
Webhook Implementations
Webhook Receiver (Node.js)
const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.json());
class WebhookHandler {
constructor(fluenceApiKey) {
this.fluenceApiKey = fluenceApiKey;
this.fluenceBaseUrl = 'https://api.fluenceinsights.com';
}
async handleNewContact(contactData) {
try {
// Analyze contact with Fluence
const analysis = await this.analyzeContact(contactData);
// Update CRM with analysis results
await this.updateCRM(contactData.id, analysis);
// Send notification
await this.sendNotification(contactData, analysis);
return { success: true, analysis };
} catch (error) {
console.error('Webhook processing error:', error);
return { success: false, error: error.message };
}
}
async analyzeContact(contactData) {
const fluenceData = {
primeiro_nome: contactData.firstName,
ultimo_nome: contactData.lastName,
empresa: contactData.company
};
const response = await fetch(`${this.fluenceBaseUrl}/crm_basico`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.fluenceApiKey
},
body: JSON.stringify(fluenceData)
});
return response.json();
}
async updateCRM(contactId, analysis) {
// Update your CRM system with analysis results
console.log(`Updating contact ${contactId} with analysis:`, analysis);
}
async sendNotification(contactData, analysis) {
// Send notification to sales team
console.log(`New contact analyzed: ${contactData.firstName} ${contactData.lastName}`);
console.log(`Stakeholder type: ${analysis.analise.tipo_stakeholder}`);
}
}
// Webhook endpoint
app.post('/webhook/contact-created', async (req, res) => {
const handler = new WebhookHandler(process.env.FLUENCE_API_KEY);
try {
const result = await handler.handleNewContact(req.body);
res.json(result);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.listen(3000, () => {
console.log('Webhook server running on port 3000');
});
Webhook Receiver (Python Flask)
from flask import Flask, request, jsonify
import requests
import os
from typing import Dict, Optional
app = Flask(__name__)
class WebhookHandler:
def __init__(self, fluence_api_key: str):
self.fluence_api_key = fluence_api_key
self.fluence_base_url = 'https://api.fluenceinsights.com'
def handle_new_contact(self, contact_data: Dict) -> Dict:
"""Handle new contact webhook"""
try:
# Analyze contact with Fluence
analysis = self.analyze_contact(contact_data)
# Update CRM with analysis results
self.update_crm(contact_data['id'], analysis)
# Send notification
self.send_notification(contact_data, analysis)
return {'success': True, 'analysis': analysis}
except Exception as e:
print(f'Webhook processing error: {e}')
return {'success': False, 'error': str(e)}
def analyze_contact(self, contact_data: Dict) -> Optional[Dict]:
"""Analyze contact with Fluence API"""
fluence_data = {
'primeiro_nome': contact_data.get('firstName', ''),
'ultimo_nome': contact_data.get('lastName', ''),
'empresa': contact_data.get('company', '')
}
response = requests.post(
f"{self.fluence_base_url}/crm_basico",
headers={
'Content-Type': 'application/json',
'X-API-Key': self.fluence_api_key
},
json=fluence_data
)
if response.status_code == 200:
return response.json()
return None
def update_crm(self, contact_id: str, analysis: Dict):
"""Update CRM system with analysis results"""
print(f"Updating contact {contact_id} with analysis: {analysis}")
def send_notification(self, contact_data: Dict, analysis: Dict):
"""Send notification to sales team"""
print(f"New contact analyzed: {contact_data.get('firstName')} {contact_data.get('lastName')}")
print(f"Stakeholder type: {analysis.get('analise', {}).get('tipo_stakeholder')}")
# Webhook endpoint
@app.route('/webhook/contact-created', methods=['POST'])
def contact_created():
handler = WebhookHandler(os.getenv('FLUENCE_API_KEY'))
try:
result = handler.handle_new_contact(request.json)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=3000)
Batch Processing Examples
Batch Contact Analysis (Python)
import asyncio
import aiohttp
import json
from typing import List, Dict
from datetime import datetime
class BatchFluenceProcessor:
def __init__(self, api_key: str, max_concurrent: int = 5):
self.api_key = api_key
self.base_url = 'https://api.fluenceinsights.com'
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_contacts(self, contacts: List[Dict]) -> List[Dict]:
"""Process multiple contacts in parallel"""
tasks = []
for contact in contacts:
task = self.process_single_contact(contact)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def process_single_contact(self, contact: Dict) -> Dict:
"""Process a single contact with rate limiting"""
async with self.semaphore:
try:
# Prepare request data
request_data = {
'primeiro_nome': contact.get('firstName', ''),
'ultimo_nome': contact.get('lastName', ''),
'empresa': contact.get('company', '')
}
# Make API request
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/crm_basico",
headers={
'Content-Type': 'application/json',
'X-API-Key': self.api_key
},
json=request_data
) as response:
if response.status == 200:
result = await response.json()
return {
'contact_id': contact.get('id'),
'success': True,
'analysis': result,
'timestamp': datetime.now().isoformat()
}
else:
error_data = await response.json()
return {
'contact_id': contact.get('id'),
'success': False,
'error': error_data.get('error', {}).get('message', 'Unknown error'),
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'contact_id': contact.get('id'),
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def save_results(self, results: List[Dict], filename: str = None):
"""Save batch processing results to file"""
if not filename:
filename = f"batch_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
# Print summary
successful = sum(1 for r in results if r.get('success'))
failed = len(results) - successful
print(f"Batch processing complete:")
print(f" Total contacts: {len(results)}")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
print(f" Results saved to: {filename}")
# Usage example
async def main():
# Sample contact data
contacts = [
{'id': '1', 'firstName': 'João', 'lastName': 'Silva', 'company': 'Microsoft Brasil'},
{'id': '2', 'firstName': 'Maria', 'lastName': 'Santos', 'company': 'Google Brasil'},
{'id': '3', 'firstName': 'Carlos', 'lastName': 'Oliveira', 'company': 'Nubank'},
# Add more contacts...
]
processor = BatchFluenceProcessor(api_key='your-api-key', max_concurrent=3)
results = await processor.process_contacts(contacts)
processor.save_results(results)
# Run batch processing
if __name__ == "__main__":
asyncio.run(main())
Batch Processing with Progress Tracking (JavaScript)
const axios = require('axios');
class BatchFluenceProcessor {
constructor(apiKey, maxConcurrent = 5) {
this.apiKey = apiKey;
this.baseUrl = 'https://api.fluenceinsights.com';
this.maxConcurrent = maxConcurrent;
this.activeRequests = 0;
this.queue = [];
this.results = [];
}
async processContacts(contacts) {
console.log(`Starting batch processing of ${contacts.length} contacts...`);
const batches = this.createBatches(contacts, this.maxConcurrent);
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
console.log(`Processing batch ${i + 1}/${batches.length} (${batch.length} contacts)`);
const batchPromises = batch.map(contact => this.processSingleContact(contact));
const batchResults = await Promise.allSettled(batchPromises);
// Add results
batchResults.forEach(result => {
if (result.status === 'fulfilled') {
this.results.push(result.value);
} else {
this.results.push({
success: false,
error: result.reason.message,
timestamp: new Date().toISOString()
});
}
});
// Progress update
const completed = this.results.length;
const successful = this.results.filter(r => r.success).length;
console.log(`Progress: ${completed}/${contacts.length} completed (${successful} successful)`);
// Rate limiting delay
if (i < batches.length - 1) {
await this.delay(1000); // 1 second delay between batches
}
}
return this.results;
}
createBatches(contacts, batchSize) {
const batches = [];
for (let i = 0; i < contacts.length; i += batchSize) {
batches.push(contacts.slice(i, i + batchSize));
}
return batches;
}
async processSingleContact(contact) {
try {
const requestData = {
primeiro_nome: contact.firstName || '',
ultimo_nome: contact.lastName || '',
empresa: contact.company || ''
};
const response = await axios.post(
`${this.baseUrl}/crm_basico`,
requestData,
{
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.apiKey
},
timeout: 30000
}
);
return {
contact_id: contact.id,
success: true,
analysis: response.data,
timestamp: new Date().toISOString()
};
} catch (error) {
return {
contact_id: contact.id,
success: false,
error: error.response?.data?.error?.message || error.message,
timestamp: new Date().toISOString()
};
}
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
saveResults(results, filename = null) {
if (!filename) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
filename = `batch_results_${timestamp}.json`;
}
const fs = require('fs');
fs.writeFileSync(filename, JSON.stringify(results, null, 2));
// Print summary
const successful = results.filter(r => r.success).length;
const failed = results.length - successful;
console.log('\nBatch processing complete:');
console.log(` Total contacts: ${results.length}`);
console.log(` Successful: ${successful}`);
console.log(` Failed: ${failed}`);
console.log(` Results saved to: ${filename}`);
}
}
// Usage example
async function main() {
const contacts = [
{ id: '1', firstName: 'João', lastName: 'Silva', company: 'Microsoft Brasil' },
{ id: '2', firstName: 'Maria', lastName: 'Santos', company: 'Google Brasil' },
{ id: '3', firstName: 'Carlos', lastName: 'Oliveira', company: 'Nubank' },
// Add more contacts...
];
const processor = new BatchFluenceProcessor('your-api-key', 3);
const results = await processor.processContacts(contacts);
processor.saveResults(results);
}
main().catch(console.error);
Error Handling & Retry Logic
Robust API Client with Retry Logic
import requests
import time
import random
from typing import Dict, Optional, Any
from functools import wraps
class FluenceAPIClient:
def __init__(self, api_key: str, base_url: str = 'https://api.fluenceinsights.com'):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'X-API-Key': api_key
})
def retry_on_failure(max_retries=3, backoff_factor=2):
"""Decorator for automatic retry logic"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except requests.exceptions.RequestException as e:
last_exception = e
if attempt < max_retries:
# Calculate delay with jitter
delay = (backoff_factor ** attempt) + random.uniform(0, 1)
print(f"Request failed (attempt {attempt + 1}/{max_retries + 1}). Retrying in {delay:.1f}s...")
time.sleep(delay)
else:
print(f"Max retries ({max_retries}) exceeded. Last error: {e}")
raise last_exception
return wrapper
return decorator
@retry_on_failure(max_retries=3)
def analyze_crm(self, data: Dict[str, Any]) -> Optional[Dict]:
"""Analyze CRM data with automatic retry"""
response = self.session.post(f"{self.base_url}/crm_basico", json=data, timeout=30)
response.raise_for_status()
return response.json()
@retry_on_failure(max_retries=3)
def analyze_complete(self, data: Dict[str, Any]) -> Optional[Dict]:
"""Analyze CRM data with complete analysis"""
response = self.session.post(f"{self.base_url}/crm", json=data, timeout=30)
response.raise_for_status()
return response.json()
def health_check(self) -> bool:
"""Check API health"""
try:
response = self.session.get(f"{self.base_url}/health", timeout=10)
return response.status_code == 200
except:
return False
# Usage with error handling
def safe_analyze_contact(client: FluenceAPIClient, contact_data: Dict) -> Dict:
"""Safely analyze a contact with comprehensive error handling"""
try:
result = client.analyze_crm(contact_data)
return {
'success': True,
'data': result,
'timestamp': time.time()
}
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
return {'success': False, 'error': 'Authentication failed - check API key'}
elif e.response.status_code == 402:
return {'success': False, 'error': 'Insufficient credits - upgrade plan'}
elif e.response.status_code == 429:
return {'success': False, 'error': 'Rate limit exceeded - try again later'}
else:
return {'success': False, 'error': f'HTTP error {e.response.status_code}'}
except requests.exceptions.Timeout:
return {'success': False, 'error': 'Request timeout'}
except requests.exceptions.ConnectionError:
return {'success': False, 'error': 'Connection error'}
except Exception as e:
return {'success': False, 'error': f'Unexpected error: {str(e)}'}
# Example usage
client = FluenceAPIClient('your-api-key')
contact_data = {
'primeiro_nome': 'João',
'ultimo_nome': 'Silva',
'empresa': 'Microsoft Brasil'
}
result = safe_analyze_contact(client, contact_data)
if result['success']:
print("Analysis successful:", result['data']['analise']['tipo_stakeholder'])
else:
print("Analysis failed:", result['error'])
Support & Resources
Getting Help
- Documentation: API Reference
- Error Handling: Error Handling Guide
- API Playground: Test Endpoints
- Support: support@tryfluence.tech
Best Practices
- Always implement retry logic for production applications
- Use environment variables for API keys
- Monitor rate limits and implement backoff strategies
- Validate input data before sending to API
- Log errors for debugging and monitoring
- Use appropriate endpoints based on your needs (Basic vs Complete)
Rate Limiting
- Essentials: 5 req/sec, 500 req/month
- Professional: 10 req/sec, 1,000 req/month
- Enterprise: 15 req/sec, 1,000 req/month
Implement exponential backoff for retries and monitor your usage in the dashboard.