Ideas2IT rewards key players with 1/3rd of the Company in New Initiative.  Read More >
Back to Blogs

How to delete records from MongoDB using Glue Job

Often AWS engineers find themselves in a situation where they need to achieve something, but AWS does not have the required feature. We recently found ourselves in one. We were already using a glue job to insert a large number of records in MongoDB. Now we wanted to delete these records via the same job if a delete flag is enabled.

We knew that Spark has a feature called repartition to scale.

partitioned_df = df.repartition(partition_count, col("salt_column”))

It writes parallelly to the DB via write_dynamic_frame.


But there are no delete operations like write_dynamic_frame. So we wrote to the support team at AWS support and they replied that such a feature was not implemented yet.

As engineers, we set out to find alternate solutions, and we discovered that we can achieve this using mapPartitions options.

ids = partitioned_df.rdd.mapPartitions(delete_from_db).collect()

mapPartitions return is a new RDD by applying a function to each partition of this RDD. In each partition, we can delete the records manually.

def delete_from_db(partitioned_df):
ids = []
collection = ''
for row in partitioned_df:
collection = row['collection']
if(len(ids) > 0) :
return delete_records(collection, ids)
else :
return ids

def delete_records(collection, ids):
mongo_uri ="mongodb://{}:{}@{}/?ssl={}&ssl_cert_reqs=CERT_NONE".format(username, password), uri.split('//')[1],'true')
mongo_client = pymongo.MongoClient(mongo_uri)
db = mongo_client[database_name]
collection = db[collection]
chunked = chunks(ids, 1024)
for chunked_ids in chunked:
query = {"_id" : { '$in':chunked_ids}}
return ids

And that is how we were able to delete records from MongoDB. We hope you found this blog useful.

- - - - - - - - - - - - -

Ideas2IT Team

Connect with Us

We'd love to brainstorm your priority tech initiatives and contribute to the best outcomes.