API Integration Patterns
This page provides code examples and patterns for integrating the Orthogramic Metamodel with OpenMetadata. Examples are provided in Python and can be adapted for other languages.
Authentication Setup
OpenMetadata Authentication
import requests
from typing import Dict, Any
class OpenMetadataClient:
"""Client for OpenMetadata API interactions."""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip('/')
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def get(self, endpoint: str) -> Dict[str, Any]:
response = requests.get(
f'{self.base_url}/api/v1/{endpoint}',
headers=self.headers
)
response.raise_for_status()
return response.json()
def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
response = requests.post(
f'{self.base_url}/api/v1/{endpoint}',
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
def patch(self, endpoint: str, data: list) -> Dict[str, Any]:
response = requests.patch(
f'{self.base_url}/api/v1/{endpoint}',
headers=self.headers,
json=data
)
response.raise_for_status()
return response.json()
# Initialize client
om_client = OpenMetadataClient(
base_url='https://your-openmetadata-instance.com',
api_key='your-api-key'
)
Pattern 1: Bulk Import Business Context
Import Capabilities as Custom Properties
import json
from typing import List, Dict
def load_orthogramic_capabilities(filepath: str) -> List[Dict]:
"""Load capabilities from Orthogramic export."""
with open(filepath, 'r') as f:
data = json.load(f)
return data.get('capabilities', [])
def create_capability_custom_property_type(client: OpenMetadataClient):
"""Create custom property type for business capability."""
custom_property = {
"name": "businessCapability",
"description": "Orthogramic business capability reference",
"propertyType": {
"id": "string-type-id", # Get from OM types endpoint
"type": "type"
}
}
# Apply to Table entity type
client.patch(
'metadata/types/name/table',
[
{
"op": "add",
"path": "/customProperties/-",
"value": custom_property
}
]
)
def apply_capability_to_tables(
client: OpenMetadataClient,
capability: Dict,
table_fqns: List[str]
):
"""Apply capability context to OpenMetadata tables."""
for table_fqn in table_fqns:
client.patch(
f'tables/name/{table_fqn}',
[
{
"op": "add",
"path": "/extension/businessCapability",
"value": capability['capabilityID']
},
{
"op": "add",
"path": "/extension/capabilityName",
"value": capability['title']
},
{
"op": "add",
"path": "/extension/capabilityOwner",
"value": capability.get('owner', 'Unknown')
}
]
)
# Usage
capabilities = load_orthogramic_capabilities('capabilities.json')
for cap in capabilities:
if 'enablingDataAssets' in cap:
apply_capability_to_tables(
om_client,
cap,
cap['enablingDataAssets']
)
Pattern 2: Sync Stakeholders to Owners
Map Orthogramic Stakeholders to OpenMetadata Users
from typing import Optional
def map_stakeholder_to_user(
client: OpenMetadataClient,
stakeholder: Dict
) -> Optional[str]:
"""Map Orthogramic stakeholder to OpenMetadata user."""
# Try to find existing user by email
email = stakeholder.get('email')
if email:
try:
user = client.get(f'users/name/{email.split("@")[0]}')
return user['id']
except requests.HTTPError:
pass
# Create user if not exists
user_data = {
"name": stakeholder['stakeholderID'].lower().replace('-', '_'),
"displayName": stakeholder['title'],
"email": email or f"{stakeholder['stakeholderID']}@placeholder.com",
"description": stakeholder.get('description', ''),
"isAdmin": False
}
# Add custom properties for business context
if stakeholder.get('responsibilities'):
user_data['extension'] = {
'businessStakeholder': stakeholder['stakeholderID'],
'accountabilities': ', '.join(stakeholder['responsibilities'])
}
result = client.post('users', user_data)
return result['id']
def assign_table_owner(
client: OpenMetadataClient,
table_fqn: str,
owner_id: str
):
"""Assign owner to OpenMetadata table."""
client.patch(
f'tables/name/{table_fqn}',
[
{
"op": "add",
"path": "/owner",
"value": {
"id": owner_id,
"type": "user"
}
}
]
)
# Usage
stakeholders = load_orthogramic_stakeholders('stakeholders.json')
for stakeholder in stakeholders:
user_id = map_stakeholder_to_user(om_client, stakeholder)
# Assign ownership based on stakeholder's data accountabilities
if 'dataAccountabilities' in stakeholder:
for asset_fqn in stakeholder['dataAccountabilities']:
assign_table_owner(om_client, asset_fqn, user_id)
Pattern 3: Create Glossary from Domains
Export Domain Definitions to OpenMetadata Glossary
def create_glossary(client: OpenMetadataClient, name: str, description: str) -> str:
"""Create a glossary for business architecture terms."""
glossary_data = {
"name": name,
"displayName": name.replace('-', ' ').title(),
"description": description,
"reviewers": [],
"owner": None
}
result = client.post('glossaries', glossary_data)
return result['id']
def create_glossary_term(
client: OpenMetadataClient,
glossary_id: str,
domain_entity: Dict,
domain_name: str
) -> str:
"""Create glossary term from Orthogramic domain entity."""
term_data = {
"glossary": {
"id": glossary_id,
"type": "glossary"
},
"name": domain_entity['title'].lower().replace(' ', '-'),
"displayName": domain_entity['title'],
"description": domain_entity.get('description', ''),
"synonyms": domain_entity.get('alternateNames', []),
"tags": [
{
"tagFQN": f"OrthogramicDomain.{domain_name}",
"source": "Classification"
}
]
}
# Add related terms if available
if 'relatedEntities' in domain_entity:
term_data['relatedTerms'] = [
{"name": rel} for rel in domain_entity['relatedEntities']
]
result = client.post('glossaryTerms', term_data)
return result['id']
def export_domain_to_glossary(
client: OpenMetadataClient,
domain_data: Dict,
domain_name: str
):
"""Export entire Orthogramic domain to OpenMetadata glossary."""
# Create glossary for domain
glossary_id = create_glossary(
client,
f"orthogramic-{domain_name}",
f"Business architecture terms from Orthogramic {domain_name} domain"
)
# Create terms for each entity
for entity in domain_data.get('entities', []):
create_glossary_term(client, glossary_id, entity, domain_name)
# Usage
domains = ['capabilities', 'value-streams', 'stakeholders', 'information']
for domain in domains:
domain_data = load_orthogramic_domain(f'{domain}.json')
export_domain_to_glossary(om_client, domain_data, domain)
Pattern 4: Policy Synchronization
Translate Business Policies to Governance Rules
def translate_policy_to_om_rule(policy: Dict) -> Dict:
"""Translate Orthogramic policy to OpenMetadata policy."""
# Map policy types to OM policy types
policy_type_map = {
'data-governance': 'Lifecycle',
'access-control': 'Access',
'data-quality': 'DataQuality',
'retention': 'Lifecycle'
}
om_policy = {
"name": policy['policyID'].lower().replace('-', '_'),
"displayName": policy['title'],
"description": policy.get('description', ''),
"policyType": policy_type_map.get(
policy.get('policyType', 'data-governance'),
'Lifecycle'
),
"enabled": policy.get('status') == 'active',
"rules": []
}
# Translate requirements to rules
for req in policy.get('requirements', []):
rule = translate_requirement_to_rule(req, policy)
if rule:
om_policy['rules'].append(rule)
# Add business context as extension
om_policy['extension'] = {
'businessPolicy': policy['policyID'],
'policyOwner': policy.get('owner', 'Unknown'),
'effectiveDate': policy.get('effectiveDate')
}
return om_policy
def translate_requirement_to_rule(requirement: Dict, policy: Dict) -> Optional[Dict]:
"""Translate policy requirement to OpenMetadata rule."""
# Example: retention requirement
if 'retention' in requirement.get('description', '').lower():
# Extract retention days from description (simplified)
import re
days_match = re.search(r'(\d+)\s*days?', requirement['description'])
days = int(days_match.group(1)) if days_match else 730
return {
"name": requirement.get('requirementID', 'retention-rule'),
"description": requirement['description'],
"effect": "deny",
"operations": ["All"],
"resources": ["table"],
"condition": f"matchAnyTag('PII') && daysSinceCreated > {days}"
}
# Add more rule translations as needed
return None
def sync_policies(client: OpenMetadataClient, policies: List[Dict]):
"""Sync Orthogramic policies to OpenMetadata."""
for policy in policies:
om_policy = translate_policy_to_om_rule(policy)
try:
# Try to update existing policy
existing = client.get(f'policies/name/{om_policy["name"]}')
client.patch(
f'policies/{existing["id"]}',
[{"op": "replace", "path": "/rules", "value": om_policy['rules']}]
)
except requests.HTTPError:
# Create new policy
client.post('policies', om_policy)
# Usage
policies = load_orthogramic_policies('policies.json')
sync_policies(om_client, policies)
Pattern 5: Value Stream to Pipeline Tagging
Tag Pipelines with Value Stream Context
def tag_pipeline_with_value_stream(
client: OpenMetadataClient,
pipeline_fqn: str,
value_stream: Dict,
stage_name: Optional[str] = None
):
"""Add value stream context to OpenMetadata pipeline."""
extension_data = {
'businessValueStream': value_stream['valueStreamID'],
'valueStreamName': value_stream['title'],
'businessValue': value_stream.get('businessValue', '')
}
if stage_name:
# Find stage details
stage = next(
(s for s in value_stream.get('stages', []) if s['name'] == stage_name),
None
)
if stage:
extension_data['valueStreamStage'] = stage.get('stageID', stage_name)
extension_data['stageName'] = stage_name
# Build patch operations
operations = [
{
"op": "add",
"path": f"/extension/{key}",
"value": value
}
for key, value in extension_data.items()
]
client.patch(f'pipelines/name/{pipeline_fqn}', operations)
def create_value_stream_tag(client: OpenMetadataClient, value_stream: Dict):
"""Create classification tag for value stream."""
# Ensure classification exists
try:
client.get('classifications/name/ValueStream')
except requests.HTTPError:
client.post('classifications', {
"name": "ValueStream",
"description": "Business value stream classification"
})
# Create tag for specific value stream
tag_data = {
"classification": "ValueStream",
"name": value_stream['valueStreamID'],
"description": value_stream['title']
}
try:
client.post('tags', tag_data)
except requests.HTTPError:
pass # Tag already exists
# Usage
value_streams = load_orthogramic_value_streams('value-streams.json')
pipeline_mapping = load_pipeline_mapping('pipeline-vs-mapping.json')
for vs in value_streams:
create_value_stream_tag(om_client, vs)
for mapping in pipeline_mapping:
if mapping['valueStreamID'] == vs['valueStreamID']:
tag_pipeline_with_value_stream(
om_client,
mapping['pipelineFQN'],
vs,
mapping.get('stage')
)
Pattern 6: Bi-Directional Sync Service
Event-Driven Synchronization
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OrthogramicOMSync:
"""Bi-directional sync service between Orthogramic and OpenMetadata."""
def __init__(
self,
om_client: OpenMetadataClient,
orthogramic_api_url: str,
orthogramic_api_key: str
):
self.om_client = om_client
self.orth_url = orthogramic_api_url
self.orth_headers = {'Authorization': f'Bearer {orthogramic_api_key}'}
def sync_capability_to_om(self, capability: Dict):
"""Sync capability update to OpenMetadata."""
logger.info(f"Syncing capability {capability['capabilityID']} to OM")
# Update custom properties on related tables
for asset in capability.get('enablingDataAssets', []):
try:
self.om_client.patch(
f'tables/name/{asset}',
[
{
"op": "add",
"path": "/extension/businessCapability",
"value": capability['capabilityID']
},
{
"op": "add",
"path": "/extension/capabilityName",
"value": capability['title']
},
{
"op": "add",
"path": "/extension/lastSyncedAt",
"value": datetime.utcnow().isoformat()
}
]
)
logger.info(f"Updated table {asset} with capability context")
except Exception as e:
logger.error(f"Failed to update table {asset}: {e}")
def sync_ownership_to_orthogramic(self, table_fqn: str, owner_id: str):
"""Sync ownership change from OM to Orthogramic."""
logger.info(f"Syncing ownership change for {table_fqn} to Orthogramic")
# Get table details including business capability
table = self.om_client.get(f'tables/name/{table_fqn}')
capability_id = table.get('extension', {}).get('businessCapability')
if capability_id:
# Update Orthogramic stakeholder link
try:
response = requests.patch(
f'{self.orth_url}/capabilities/{capability_id}/stakeholders',
headers=self.orth_headers,
json={
'dataAsset': table_fqn,
'ownerID': owner_id,
'syncSource': 'openmetadata',
'syncTimestamp': datetime.utcnow().isoformat()
}
)
response.raise_for_status()
logger.info(f"Updated Orthogramic capability {capability_id}")
except Exception as e:
logger.error(f"Failed to update Orthogramic: {e}")
def handle_om_event(self, event: Dict):
"""Handle OpenMetadata webhook event."""
event_type = event.get('eventType')
entity = event.get('entity', {})
if event_type == 'entityUpdated' and entity.get('entityType') == 'table':
# Check if ownership changed
change_description = event.get('changeDescription', {})
for field in change_description.get('fieldsUpdated', []):
if field.get('name') == 'owner':
new_owner = field.get('newValue', {}).get('id')
if new_owner:
self.sync_ownership_to_orthogramic(
entity['fullyQualifiedName'],
new_owner
)
def run_full_sync(self):
"""Run full synchronization."""
logger.info("Starting full sync")
# Sync capabilities to OM
capabilities = self._load_all_capabilities()
for cap in capabilities:
self.sync_capability_to_om(cap)
logger.info("Full sync completed")
def _load_all_capabilities(self) -> List[Dict]:
"""Load all capabilities from Orthogramic."""
response = requests.get(
f'{self.orth_url}/capabilities',
headers=self.orth_headers
)
response.raise_for_status()
return response.json().get('capabilities', [])
# Usage
sync_service = OrthogramicOMSync(
om_client=om_client,
orthogramic_api_url='https://api.orthogramic.com',
orthogramic_api_key='your-orthogramic-key'
)
# Run initial sync
sync_service.run_full_sync()
# Handle webhook events (in your webhook handler)
# sync_service.handle_om_event(webhook_payload)
Error Handling Best Practices
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def safe_api_call(client: OpenMetadataClient, method: str, *args, **kwargs):
"""Execute API call with retry logic."""
func = getattr(client, method)
return func(*args, **kwargs)
def batch_update_with_error_handling(
client: OpenMetadataClient,
updates: List[Dict]
) -> Dict[str, List]:
"""Batch update with error tracking."""
results = {'success': [], 'failed': []}
for update in updates:
try:
safe_api_call(
client,
'patch',
update['endpoint'],
update['operations']
)
results['success'].append(update['endpoint'])
except Exception as e:
results['failed'].append({
'endpoint': update['endpoint'],
'error': str(e)
})
logger.error(f"Failed to update {update['endpoint']}: {e}")
return results
Related Documentation
- Overview — Integration architecture
- Entity Mapping — Detailed mappings
- Use Cases — Practical scenarios
- Terminology Bridge — Vocabulary translation