Skip to main content

Database Connections

Connect Heimdall directly to your databases for seamless data integration and processing.

Supported Databases

Databricks

Connect to your Databricks SQL Warehouse for direct data access.

Configuration

# Databricks connection settings
databricks_config = {
'server_hostname': 'your-workspace.cloud.databricks.com',
'http_path': '/sql/1.0/warehouses/your-warehouse-id',
'access_token': 'your-access-token'
}

Example Integration

import requests
from databricks import sql

# Connect to Databricks
connection = sql.connect(
server_hostname=databricks_config['server_hostname'],
http_path=databricks_config['http_path'],
access_token=databricks_config['access_token']
)

# Query data and send to Heimdall
cursor = connection.cursor()
cursor.execute("SELECT * FROM your_table LIMIT 1000")
data = cursor.fetchall()

# Process with Heimdall Read
for row in data:
text_data = row[0] # Assuming first column is text
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': text_data}
)
print(response.json())

MySQL

Connect to MySQL databases for data processing.

Configuration

import mysql.connector

# MySQL connection settings
mysql_config = {
'host': 'your-mysql-host',
'user': 'your-username',
'password': 'your-password',
'database': 'your-database'
}

Example Integration

import mysql.connector
import requests

# Connect to MySQL
connection = mysql.connector.connect(**mysql_config)
cursor = connection.cursor()

# Query data
cursor.execute("SELECT id, text_content FROM documents LIMIT 1000")
rows = cursor.fetchall()

# Process with Heimdall
for row in rows:
doc_id, text_content = row

# Analyze text with Heimdall Read
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': text_content}
)

if response.status_code == 200:
analysis = response.json()

# Store results back to database
cursor.execute("""
UPDATE documents
SET sentiment = %s, word_count = %s
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], doc_id))

connection.commit()

PostgreSQL

Connect to PostgreSQL databases for advanced data processing.

Configuration

import psycopg2

# PostgreSQL connection settings
postgres_config = {
'host': 'your-postgres-host',
'user': 'your-username',
'password': 'your-password',
'database': 'your-database',
'port': 5432
}

Example Integration

import psycopg2
import requests
import pandas as pd

# Connect to PostgreSQL
connection = psycopg2.connect(**postgres_config)

# Query data
query = "SELECT id, text_content, created_at FROM posts WHERE processed = false"
df = pd.read_sql(query, connection)

# Process with Heimdall
for _, row in df.iterrows():
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': row['text_content']}
)

if response.status_code == 200:
analysis = response.json()

# Update database with results
cursor = connection.cursor()
cursor.execute("""
UPDATE posts
SET sentiment = %s,
word_count = %s,
processed = true
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], row['id']))

connection.commit()

MariaDB

Connect to MariaDB databases for data processing.

Configuration

import mariadb

# MariaDB connection settings
mariadb_config = {
'host': 'your-mariadb-host',
'user': 'your-username',
'password': 'your-password',
'database': 'your-database',
'port': 3306
}

Example Integration

import mariadb
import requests

# Connect to MariaDB
connection = mariadb.connect(**mariadb_config)
cursor = connection.cursor()

# Query data
cursor.execute("SELECT id, content FROM articles WHERE analyzed = false")
rows = cursor.fetchall()

# Process with Heimdall
for row in rows:
article_id, content = row

# Analyze content with Heimdall Read
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': content}
)

if response.status_code == 200:
analysis = response.json()

# Store analysis results
cursor.execute("""
UPDATE articles
SET sentiment = %s,
word_count = %s,
analyzed = true
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], article_id))

connection.commit()

Batch Processing

Large Dataset Processing

import pandas as pd
import requests
from sqlalchemy import create_engine

def process_large_dataset(connection_string, table_name, text_column):
# Connect to database
engine = create_engine(connection_string)

# Process in batches
batch_size = 1000
offset = 0

while True:
# Query batch
query = f"""
SELECT id, {text_column}
FROM {table_name}
WHERE processed = false
LIMIT {batch_size} OFFSET {offset}
"""

df = pd.read_sql(query, engine)
if df.empty:
break

# Process batch
for _, row in df.iterrows():
try:
response = requests.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': row[text_column]}
)

if response.status_code == 200:
analysis = response.json()

# Update database
engine.execute(f"""
UPDATE {table_name}
SET sentiment = %s,
word_count = %s,
processed = true
WHERE id = %s
""", (analysis['sentiment'], analysis['word_count'], row['id']))

except Exception as e:
print(f"Error processing row {row['id']}: {e}")

offset += batch_size
print(f"Processed {offset} records")

Real-time Processing

Stream Processing

import asyncio
import aiohttp
import asyncpg

async def process_stream():
# Connect to database
conn = await asyncpg.connect(
host='your-host',
user='your-username',
password='your-password',
database='your-database'
)

# Monitor for new records
async with aiohttp.ClientSession() as session:
while True:
# Query for unprocessed records
rows = await conn.fetch("""
SELECT id, content FROM posts
WHERE processed = false
LIMIT 10
""")

if not rows:
await asyncio.sleep(1) # Wait for new data
continue

# Process records
tasks = []
for row in rows:
task = process_single_record(session, row['id'], row['content'])
tasks.append(task)

await asyncio.gather(*tasks)
await asyncio.sleep(0.1) # Small delay between batches

async def process_single_record(session, record_id, content):
try:
async with session.post(
'https://read.heimdallapp.org/read/v1/api/process',
headers={
'X-api-key': 'YOUR-API-KEY',
'X-username': 'YOUR-USERNAME'
},
json={'text': content}
) as response:
if response.status == 200:
analysis = await response.json()

# Update database
await conn.execute("""
UPDATE posts
SET sentiment = %s,
word_count = %s,
processed = true
WHERE id = %s
""", analysis['sentiment'], analysis['word_count'], record_id)

except Exception as e:
print(f"Error processing record {record_id}: {e}")

Best Practices

Connection Management

  • Use connection pooling for better performance
  • Implement retry logic for failed connections
  • Monitor connection health regularly
  • Close connections properly to avoid leaks

Security

  • Use environment variables for credentials
  • Enable SSL/TLS for encrypted connections
  • Implement access controls and user permissions
  • Regular security updates for database drivers

Next Steps

Now that you can connect to databases:

  1. Monitor Performance - Track database integration performance
  2. Follow Best Practices - Learn production deployment tips
  3. Integrate APIs - Connect to your applications