Introduction
In today’s data-driven world, efficient data synchronization between different storage systems is crucial for maintaining consistency and providing accurate insights. In this technical blog, we will delve into the process of synchronizing real-time changes from MongoDB to Elasticsearch using a Python-based solution. By combining the power of Azure services, MongoDB, Elasticsearch, and Python scripting, we’ll demonstrate a robust and scalable approach to achieving seamless data synchronization.
Architecture and Components:
Before we dive into the implementation details, let’s understand the architecture and components involved in our synchronization solution.
Our architecture includes:
MongoDB: Storing the primary data source.
Elasticsearch: Serving as the secondary data store for efficient search and analytics.
Azure Event Hub: Capturing real-time changes from MongoDB.
Azure Functions: Executing logic to synchronize data between MongoDB and Elasticsearch.
Message Queue: Temporarily storing changes in case of Elasticsearch connectivity issues.
Python Script for MongoDB Change Stream Listener:
To initiate our synchronization process, we’ve developed a handy Python script that acts as a keen observer of MongoDB changes. This script is designed to capture events like “insert,” “update,” and “delete” operations taking place within MongoDB. These events are then seamlessly relayed to Azure Event Hub for the next steps. To ensure this process is always up and running, we’ve harnessed the power of an Azure Web Job. This job diligently runs the Python script in the background, creating a continuous loop that stays attuned to MongoDB changes. Whenever a change occurs, the script swiftly sends the relevant details to Azure Event Hub, ensuring a smooth and steady flow of information. The code of the script is given below
#!/usr/bin/env python3
import asyncio
import time
import sys
import os
from pymongo import MongoClient
from threading import Thread
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
from azure.eventhub import EventHubProducerClient as EventHubProducerClient2
from bson import ObjectId, Timestamp
import json
from datetime import datetime
#function to convert object of objectid to string
def convert_objectids_to_strings(dictionary):
for key, value in dictionary.items():
if isinstance(value, ObjectId):
dictionary[key] = str(value)
elif isinstance(value, dict):
convert_objectids_to_strings(value)
#function to convert object of timestamp into datetime
def convert_timestamps_to_strings(obj):
if isinstance(obj, dict):
for key, value in obj.items():
obj[key] = convert_timestamps_to_strings(value)
elif isinstance(obj, list):
for i, item in enumerate(obj):
obj[i] = convert_timestamps_to_strings(item)
elif isinstance(obj, Timestamp):
obj = datetime.fromtimestamp(obj.time / 1000).strftime('%Y-%m-%d %H:%M:%S.%f')
return obj
# Start script
####
print("==========================================")
print(" Change Stream Listener ")
print("Change Stream Events currently monitored: ")
print("Insert, Update, Delete ")
print("==========================================")
####
# Main start function
# Start each individual thread for each event
# Sleep momentarily after starting each thread
####
def main():
print('Starting Change Stream Listener.\n')
# Create the insert thread
insert_loop = asyncio.new_event_loop()
insert_loop.call_soon_threadsafe(insert_change_stream)
t = Thread(target=start_loop, args=(insert_loop,))
t.start()
time.sleep(0.25)
# Create the update thread
update_loop = asyncio.new_event_loop()
update_loop.call_soon_threadsafe(update_change_stream)
t = Thread(target=start_loop, args=(update_loop,))
t.start()
time.sleep(0.25)
# Create the delete thread
delete_loop = asyncio.new_event_loop()
delete_loop.call_soon_threadsafe(delete_change_stream)
t = Thread(target=start_loop, args=(delete_loop,))
t.start()
time.sleep(0.25)
####
# Make sure the loop continues
####
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
####
# Insert Change Stream
####
def insert_change_stream():
print("Insert listener thread started.")
mongo_client = MongoClient(MONGODB_ATLAS_URL)
db = mongo_client[DATABASE]
accounts_collection = db[COLLECTION]
# Change stream pipeline
pipeline = [
{'$match': {'operationType': 'insert'}}
]
try:
# document_list = []
for document in accounts_collection.watch(pipeline=pipeline, full_document='updateLookup'):
print("=== INSERT EVENT ===\n")
convert_objectids_to_strings(document)
document = convert_timestamps_to_strings(document)
data = json.dumps(document)
producer = EventHubProducerClient2.from_connection_string(conn_str="event_hub_connection_string", eventhub_name="event_hub_name")
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(data))
with producer:
producer.send_batch(event_data_batch)
except Exception as e:
print(e)
except KeyboardInterrupt:
keyboard_shutdown()
####
# Update Change Stream
####
def update_change_stream():
print("Update listener thread started.")
mongo_client = MongoClient(MONGODB_ATLAS_URL)
db = mongo_client[DATABASE]
accounts_collection = db[COLLECTION]
# Make sure resume counter does NOT exist as it is being used
# to trigger the resume operation in another thread
pipeline = [
{'$match': {'operationType': 'update'}}
]
try:
for document in accounts_collection.watch(pipeline=pipeline, full_document='updateLookup'):
print("\n=== UPDATE EVENT ===\n")
convert_objectids_to_strings(document)
document = convert_timestamps_to_strings(document)
data = json.dumps(document)
producer = EventHubProducerClient2.from_connection_string(conn_str="event_hub_connection_string", eventhub_name="event_hub_name")
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(data))
with producer:
producer.send_batch(event_data_batch)
except KeyboardInterrupt:
keyboard_shutdown()
# Delete Change Stream
def delete_change_stream():
print("Delete listener thread started.")
mongo_client = MongoClient(MONGODB_ATLAS_URL)
db = mongo_client[DATABASE]
accounts_collection = db[COLLECTION]
pipeline = [
{'$match': {'operationType': 'delete'}}
]
try:
for document in accounts_collection.watch(pipeline=pipeline, full_document='updateLookup'):
print("\n=== DELETE EVENT ===\n")
convert_objectids_to_strings(document)
document = convert_timestamps_to_strings(document)
data = json.dumps(document)
producer = EventHubProducerClient2.from_connection_string(conn_str="event_hub_connection_string", eventhub_name="event_hub_name")
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(data))
with producer:
producer.send_batch(event_data_batch)
except KeyboardInterrupt:
keyboard_shutdown()
###
# "Gracefully" consume output via ctrl-c
###
def keyboard_shutdown():
print('Interrupted\n')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
#credentials
MONGODB_ATLAS_URL = "mongodb_connection_string"
DATABASE = "database_name"
COLLECTION = "collection_name"
####
# Main
####
if __name__ == '__main__':
main()
Azure Functions for Real-time Synchronization:
In our synchronization solution, we leverage the power of Azure Functions to orchestrate the real-time synchronization process between MongoDB and Elasticsearch. These serverless compute resources provide an efficient and scalable way to manage the flow of data, ensuring that changes in MongoDB are seamlessly propagated to Elasticsearch. We’ve implemented two distinct types of Azure Functions to facilitate this synchronization: the Event Trigger Azure Function and the Time Trigger Azure Function.
-
Event Trigger Azure Function:
The Event Trigger Azure Function plays a pivotal role in our data synchronization pipeline. As the name suggests, this function is triggered by events received from Azure Event Hub, which serves as the conduit for MongoDB change events. Let’s break down the key functionalities of this function:
- Listening for Event Hub Events: Upon receiving an event from Azure Event Hub, the function is triggered, initiating the synchronization process.
- Event Processing: The function processes the received event to extract the relevant data representing changes in the MongoDB database. This data could encompass “insert,” “update,” or “delete” operations.
- Data Transformation: The extracted data is transformed as necessary to ensure compatibility with Elasticsearch’s data structure and schema.
- Elasticsearch Synchronization: Once transformed, the data is pushed into the Elasticsearch cluster. This step ensures that the changes are seamlessly reflected in the Elasticsearch index, maintaining real-time consistency between the two data sources.
- Connectivity Management: The function is equipped to handle potential connectivity issues with Elasticsearch. In the event of connectivity disruptions, the function can store the change event in an Azure message queue.
The code of the Azure function is given below:
import logging
import azure.functions as func
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from elasticsearch import Elasticsearch
import json
QUEUE_CONNECTION_STR = "message_queue_connection_string"
QUEUE_NAME = "message_queue_name"
def main(event: func.EventHubEvent):
es = Elasticsearch('elasticsearch_host',http_auth=('user_name', 'password'),verify_certs=False)
logging.info(f'Json: {json.loads(event.get_body())}')
if es.ping():
try:
id_value, obj = "",""
obj = json.loads(event.get_body())
if obj['operationType'] == "insert" or obj['operationType'] == "update":
id_value = obj['fullDocument']['_id']
del obj['fullDocument']['_id']
es.index(index="test-trigger", id=id_value, document=obj['fullDocument'])
elif obj['operationType'] == "delete":
id_value = obj['documentKey']['_id']
es.delete(index="test-trigger", id=id_value)
except Exception as e:
logging.error(f'Exception: {e}')
servicebus_client = ServiceBusClient.from_connection_string(conn_str=QUEUE_CONNECTION_STR, logging_enable=True)
with servicebus_client:
# get a Queue Sender object to send messages to the queue
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
# send message
message = ServiceBusMessage(event.get_body())
sender.send_messages(message)
else:
servicebus_client = ServiceBusClient.from_connection_string(conn_str=QUEUE_CONNECTION_STR, logging_enable=True)
with servicebus_client:
# get a Queue Sender object to send messages to the queue
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
# send message
message = ServiceBusMessage(event.get_body())
sender.send_messages(message)
-
Time Trigger Azure Function:
The Time Trigger Azure Function introduces an additional layer of reliability to our synchronization process. Unlike the Event Trigger function, which is driven by real-time events, the Time Trigger function operates on a predefined schedule. Its primary purpose is to check for any pending changes that may have been missed by the Event Trigger function, thus ensuring comprehensive data synchronization. Here’s how this function operates:
- Scheduled Execution: The Time Trigger function is invoked at specified intervals, typically every 30 minutes.
- Message Queue Examination: The function checks a designated message queue that temporarily stores change events. These events may have been queued due to Elasticsearch connectivity issues or other unforeseen factors.
- Elasticsearch Application: If pending changes are detected in the message queue, the Time Trigger function applies these changes to the Elasticsearch cluster. This step guarantees that all changes, whether captured in real-time or queued, are eventually propagated to Elasticsearch.
- Dead Letter Queue Handling: Similar to the Event Trigger function, the Time
The code of the Azure function is given below:
import datetime
import logging
import azure.functions as func
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from elasticsearch import Elasticsearch
import json
QUEUE_CONNECTION_STR = "message_queue_connection_string"
QUEUE_NAME = "message_queue_name"
def main(mytimer: func.TimerRequest) -> None:
utc_timestamp = datetime.datetime.utcnow().replace(
tzinfo=datetime.timezone.utc).isoformat()
if mytimer.past_due:
logging.info('The timer is past due!')
logging.info('Python timer trigger function ran at %s', utc_timestamp)
es = Elasticsearch('elasticsearch_host',http_auth=('user_name', 'password'),verify_certs=False)
if es.ping():
try:
servicebus_client = ServiceBusClient.from_connection_string(conn_str=QUEUE_CONNECTION_STR, logging_enable=False)
with servicebus_client:
# get the Queue Receiver object for the queue
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
#sending data to Elasticsearch
id_value, obj = "",""
obj = json.loads(str(msg))
logging.info(f'Data: {obj}')
if obj['operationType'] == "insert" or obj['operationType'] == "update":
try:
id_value = obj['fullDocument']['_id']
del obj['fullDocument']['_id']
es.index(index="test-trigger", id=id_value, document=obj['fullDocument'])
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
except Exception as e:
logging.warning(f'Insert/Update Exception: {e}')
# receiver.dead_letter_message(str(e)+ "--" + str(msg))
receiver.dead_letter_message(msg, reason="Insert/Update", error_description=str(e))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
elif obj['operationType'] == "delete":
try:
id_value = obj['documentKey']['_id']
es.delete(index="test-trigger", id=id_value)
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
except Exception as e:
logging.warning(f'Delete Exception: {e}')
# receiver.dead_letter_message(str(e)+ "--" + str(msg))
receiver.dead_letter_message(msg, reason="Delete", error_description=str(e))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
except Exception as e:
logging.error(f'Exception: {e}')