Synchronizing MongoDB and Elasticsearch in Real Time: A Comprehensive Guide

Introduction

In today’s data-driven world, efficient data synchronization between different storage systems is crucial for maintaining consistency and providing accurate insights. In this technical blog, we will delve into the process of synchronizing real-time changes from MongoDB to Elasticsearch using a Python-based solution. By combining the power of Azure services, MongoDB, Elasticsearch, and Python scripting, we’ll demonstrate a robust and scalable approach to achieving seamless data synchronization.

Architecture and Components:

Before we dive into the implementation details, let’s understand the architecture and components involved in our synchronization solution.

Our architecture includes:

MongoDB: Storing the primary data source.

Elasticsearch: Serving as the secondary data store for efficient search and analytics.

Azure Event Hub: Capturing real-time changes from MongoDB.

Azure Functions: Executing logic to synchronize data between MongoDB and Elasticsearch.

Message Queue: Temporarily storing changes in case of Elasticsearch connectivity issues.

Python Script for MongoDB Change Stream Listener:

To initiate our synchronization process, we’ve developed a handy Python script that acts as a keen observer of MongoDB changes. This script is designed to capture events like “insert,” “update,” and “delete” operations taking place within MongoDB. These events are then seamlessly relayed to Azure Event Hub for the next steps. To ensure this process is always up and running, we’ve harnessed the power of an Azure Web Job. This job diligently runs the Python script in the background, creating a continuous loop that stays attuned to MongoDB changes. Whenever a change occurs, the script swiftly sends the relevant details to Azure Event Hub, ensuring a smooth and steady flow of information. The code of the script is given below

 

#!/usr/bin/env python3




import asyncio

import time

import sys

import os

from pymongo import MongoClient

from threading import Thread

import asyncio

from azure.eventhub.aio import EventHubProducerClient

from azure.eventhub import EventData

from azure.eventhub import EventHubProducerClient as EventHubProducerClient2

from bson import ObjectId, Timestamp

import json

from datetime import datetime





#function to convert object of objectid to string

def convert_objectids_to_strings(dictionary):

    for key, value in dictionary.items():

        if isinstance(value, ObjectId):

            dictionary[key] = str(value)

        elif isinstance(value, dict):

            convert_objectids_to_strings(value)




#function to convert object of timestamp into datetime

def convert_timestamps_to_strings(obj):

    if isinstance(obj, dict):

        for key, value in obj.items():

            obj[key] = convert_timestamps_to_strings(value)

    elif isinstance(obj, list):

        for i, item in enumerate(obj):

            obj[i] = convert_timestamps_to_strings(item)

    elif isinstance(obj, Timestamp):

        obj = datetime.fromtimestamp(obj.time / 1000).strftime('%Y-%m-%d %H:%M:%S.%f')

    return obj




# Start script

####

print("==========================================")

print("    Change Stream Listener                ")

print("Change Stream Events currently monitored: ")

print("Insert, Update, Delete                    ")

print("==========================================")





####

# Main start function

# Start each individual thread for each event

# Sleep momentarily after starting each thread

####

def main():

    print('Starting Change Stream Listener.\n')




    # Create the insert thread

    insert_loop = asyncio.new_event_loop()

    insert_loop.call_soon_threadsafe(insert_change_stream)

 




    t = Thread(target=start_loop, args=(insert_loop,))

    t.start()

    time.sleep(0.25)




    # Create the update thread

    update_loop = asyncio.new_event_loop()

    update_loop.call_soon_threadsafe(update_change_stream)

    t = Thread(target=start_loop, args=(update_loop,))

    t.start()

    time.sleep(0.25)




    # Create the delete thread

    delete_loop = asyncio.new_event_loop()

    delete_loop.call_soon_threadsafe(delete_change_stream)

    t = Thread(target=start_loop, args=(delete_loop,))

    t.start()

    time.sleep(0.25)




####

# Make sure the loop continues

####

def start_loop(loop):

    asyncio.set_event_loop(loop)

    loop.run_forever()





####

# Insert Change Stream

####

def insert_change_stream():

    print("Insert listener thread started.")

    mongo_client = MongoClient(MONGODB_ATLAS_URL)

    db = mongo_client[DATABASE]

    accounts_collection = db[COLLECTION]




    # Change stream pipeline

    pipeline = [

        {'$match': {'operationType': 'insert'}}

    ]




    try:

        # document_list = []

        for document in accounts_collection.watch(pipeline=pipeline, full_document='updateLookup'):

            print("=== INSERT EVENT ===\n")

            convert_objectids_to_strings(document)

            document = convert_timestamps_to_strings(document)

            data = json.dumps(document)

            producer = EventHubProducerClient2.from_connection_string(conn_str="event_hub_connection_string", eventhub_name="event_hub_name")

            event_data_batch = producer.create_batch()

            event_data_batch.add(EventData(data))

            with producer:

                producer.send_batch(event_data_batch)

    except Exception as e:

        print(e)

    except KeyboardInterrupt:

        keyboard_shutdown()





####

# Update Change Stream

####

def update_change_stream():

    print("Update listener thread started.")

    mongo_client = MongoClient(MONGODB_ATLAS_URL)

    db = mongo_client[DATABASE]

    accounts_collection = db[COLLECTION]




    # Make sure resume counter does NOT exist as it is being used

    # to trigger the resume operation in another thread

    pipeline = [

        {'$match': {'operationType': 'update'}}

    ]




    try:

        for document in accounts_collection.watch(pipeline=pipeline, full_document='updateLookup'):

            print("\n=== UPDATE EVENT ===\n")

            convert_objectids_to_strings(document)

            document = convert_timestamps_to_strings(document)

            data = json.dumps(document)

            producer = EventHubProducerClient2.from_connection_string(conn_str="event_hub_connection_string", eventhub_name="event_hub_name")

            event_data_batch = producer.create_batch()

            event_data_batch.add(EventData(data))

            with producer:

                producer.send_batch(event_data_batch) 




    except KeyboardInterrupt:

        keyboard_shutdown()





# Delete Change Stream

def delete_change_stream():

    print("Delete listener thread started.")

    mongo_client = MongoClient(MONGODB_ATLAS_URL)

    db = mongo_client[DATABASE]

    accounts_collection = db[COLLECTION]




    pipeline = [

        {'$match': {'operationType': 'delete'}}

    ]




    try:

        for document in accounts_collection.watch(pipeline=pipeline, full_document='updateLookup'):

            print("\n=== DELETE EVENT ===\n")

            convert_objectids_to_strings(document)

            document = convert_timestamps_to_strings(document)

            data = json.dumps(document)

            producer = EventHubProducerClient2.from_connection_string(conn_str="event_hub_connection_string", eventhub_name="event_hub_name")

            event_data_batch = producer.create_batch()

            event_data_batch.add(EventData(data))

            with producer:

                producer.send_batch(event_data_batch)

    except KeyboardInterrupt:

        keyboard_shutdown()




###

# "Gracefully" consume output via ctrl-c

###

def keyboard_shutdown():

    print('Interrupted\n')

    try:

        sys.exit(0)

    except SystemExit:

        os._exit(0)




#credentials

MONGODB_ATLAS_URL = "mongodb_connection_string"

DATABASE = "database_name"

COLLECTION = "collection_name"




####

# Main

####

if __name__ == '__main__':

    main()

 

Azure Functions for Real-time Synchronization:

In our synchronization solution, we leverage the power of Azure Functions to orchestrate the real-time synchronization process between MongoDB and Elasticsearch. These serverless compute resources provide an efficient and scalable way to manage the flow of data, ensuring that changes in MongoDB are seamlessly propagated to Elasticsearch. We’ve implemented two distinct types of Azure Functions to facilitate this synchronization: the Event Trigger Azure Function and the Time Trigger Azure Function.

 

  • Event Trigger Azure Function:

The Event Trigger Azure Function plays a pivotal role in our data synchronization pipeline. As the name suggests, this function is triggered by events received from Azure Event Hub, which serves as the conduit for MongoDB change events. Let’s break down the key functionalities of this function:

  1. Listening for Event Hub Events: Upon receiving an event from Azure Event Hub, the function is triggered, initiating the synchronization process.
  2. Event Processing: The function processes the received event to extract the relevant data representing changes in the MongoDB database. This data could encompass “insert,” “update,” or “delete” operations.
  3. Data Transformation: The extracted data is transformed as necessary to ensure compatibility with Elasticsearch’s data structure and schema.
  4. Elasticsearch Synchronization: Once transformed, the data is pushed into the Elasticsearch cluster. This step ensures that the changes are seamlessly reflected in the Elasticsearch index, maintaining real-time consistency between the two data sources.
  5. Connectivity Management: The function is equipped to handle potential connectivity issues with Elasticsearch. In the event of connectivity disruptions, the function can store the change event in an Azure message queue.

 

The code of the Azure function is given below:

import logging

import azure.functions as func

from azure.servicebus import ServiceBusClient, ServiceBusMessage

from elasticsearch import Elasticsearch

import json




QUEUE_CONNECTION_STR = "message_queue_connection_string"

QUEUE_NAME = "message_queue_name"




def main(event: func.EventHubEvent):

    

    es = Elasticsearch('elasticsearch_host',http_auth=('user_name', 'password'),verify_certs=False)

    logging.info(f'Json: {json.loads(event.get_body())}')

    

    if es.ping():

        try:

            id_value, obj = "",""

            obj = json.loads(event.get_body())

            if obj['operationType'] == "insert" or obj['operationType'] == "update":

                id_value = obj['fullDocument']['_id']

                del obj['fullDocument']['_id']

                es.index(index="test-trigger", id=id_value, document=obj['fullDocument'])

            elif obj['operationType'] == "delete":

                id_value = obj['documentKey']['_id']

                es.delete(index="test-trigger", id=id_value)

        except Exception as e:

            logging.error(f'Exception: {e}')

            servicebus_client = ServiceBusClient.from_connection_string(conn_str=QUEUE_CONNECTION_STR, logging_enable=True)

            with servicebus_client:

                # get a Queue Sender object to send messages to the queue

                sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)

                with sender:

                    # send message

                    message = ServiceBusMessage(event.get_body())

                    sender.send_messages(message)

    else:

        servicebus_client = ServiceBusClient.from_connection_string(conn_str=QUEUE_CONNECTION_STR, logging_enable=True)

        with servicebus_client:

            # get a Queue Sender object to send messages to the queue

            sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)

            with sender:

                # send message

                message = ServiceBusMessage(event.get_body())

                sender.send_messages(message)



  1. Time Trigger Azure Function:

The Time Trigger Azure Function introduces an additional layer of reliability to our synchronization process. Unlike the Event Trigger function, which is driven by real-time events, the Time Trigger function operates on a predefined schedule. Its primary purpose is to check for any pending changes that may have been missed by the Event Trigger function, thus ensuring comprehensive data synchronization. Here’s how this function operates:

  1. Scheduled Execution: The Time Trigger function is invoked at specified intervals, typically every 30 minutes.
  2. Message Queue Examination: The function checks a designated message queue that temporarily stores change events. These events may have been queued due to Elasticsearch connectivity issues or other unforeseen factors.
  3. Elasticsearch Application: If pending changes are detected in the message queue, the Time Trigger function applies these changes to the Elasticsearch cluster. This step guarantees that all changes, whether captured in real-time or queued, are eventually propagated to Elasticsearch.
  4. Dead Letter Queue Handling: Similar to the Event Trigger function, the Time

 

The code of the Azure function is given below:

import datetime

import logging

import azure.functions as func

from azure.servicebus import ServiceBusClient, ServiceBusMessage

from elasticsearch import Elasticsearch

import json




QUEUE_CONNECTION_STR = "message_queue_connection_string"

QUEUE_NAME = "message_queue_name"




def main(mytimer: func.TimerRequest) -> None:

    utc_timestamp = datetime.datetime.utcnow().replace(

        tzinfo=datetime.timezone.utc).isoformat()




    if mytimer.past_due:

        logging.info('The timer is past due!')




    logging.info('Python timer trigger function ran at %s', utc_timestamp)




    es = Elasticsearch('elasticsearch_host',http_auth=('user_name', 'password'),verify_certs=False)

    if es.ping():

        try:

            servicebus_client = ServiceBusClient.from_connection_string(conn_str=QUEUE_CONNECTION_STR, logging_enable=False)

            with servicebus_client:

                # get the Queue Receiver object for the queue

                receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)

                with receiver:

                    for msg in receiver:

                        #sending data to Elasticsearch

                        id_value, obj = "",""

                        obj = json.loads(str(msg))

                        logging.info(f'Data: {obj}')

                        if obj['operationType'] == "insert" or obj['operationType'] == "update":

                            try:

                                id_value = obj['fullDocument']['_id']

                                del obj['fullDocument']['_id']

                                es.index(index="test-trigger", id=id_value, document=obj['fullDocument'])

                                # complete the message so that the message is removed from the queue

                                receiver.complete_message(msg)

                            except Exception as e:

                                logging.warning(f'Insert/Update Exception: {e}')

                                # receiver.dead_letter_message(str(e)+ "--" + str(msg))

                                receiver.dead_letter_message(msg, reason="Insert/Update", error_description=str(e))

                                # complete the message so that the message is removed from the queue

                                receiver.complete_message(msg)      

                        elif obj['operationType'] == "delete":

                            try:

                                id_value = obj['documentKey']['_id']

                                es.delete(index="test-trigger", id=id_value)

                                # complete the message so that the message is removed from the queue

                                receiver.complete_message(msg)

                            except Exception as e:

                                logging.warning(f'Delete Exception: {e}')

                                # receiver.dead_letter_message(str(e)+ "--" + str(msg))

                                receiver.dead_letter_message(msg, reason="Delete", error_description=str(e))

                                # complete the message so that the message is removed from the queue

                                receiver.complete_message(msg)

        except Exception as e:

            logging.error(f'Exception: {e}')

Muhammad Zaid Raza

Data Engineer

Leave a Reply