How to delete records from MongoDB using Glue Job - Ideas2IT

How to delete records from MongoDB using Glue Job

Share This

Often AWS engineers find themselves in a situation where they need to achieve something, but AWS does not have the required feature. We recently found ourselves in one. We were already using a glue job to insert a large number of records in MongoDB. Now we wanted to delete these records via the same job if a delete flag is enabled.

We knew that Spark has a feature called repartition to scale.

partitioned_df = df.repartition(partition_count, col("salt_column”))

It writes parallelly to the DB via write_dynamic_frame.


But there are no delete operations like write_dynamic_frame. So we wrote to the support team at AWS support and they replied that such a feature was not implemented yet.

As engineers, we set out to find alternate solutions, and we discovered that we can achieve this using mapPartitions options. 

ids = partitioned_df.rdd.mapPartitions(delete_from_db).collect()

mapPartitions return is a new RDD by applying a function to each partition of this RDD. In each partition, we can delete the records manually.

def delete_from_db(partitioned_df):
		ids = []
		collection = ''
		for row in partitioned_df:
			collection = row['collection']
		if(len(ids) > 0) :
			return delete_records(collection, ids)
		else :
			return ids


def delete_records(collection, ids):
		mongo_uri ="mongodb://{}:{}@{}/?ssl={}&ssl_cert_reqs=CERT_NONE".format(username, password), uri.split('//')[1],'true')
		mongo_client = pymongo.MongoClient(mongo_uri)
		db = mongo_client[database_name]
		collection = db[collection]
		chunked = chunks(ids, 1024)
		for chunked_ids in chunked:
			query = {"_id" : { '$in':chunked_ids}}
		return ids

And that is how we were able to delete records from MongoDB. We hope you found this blog useful. 

– – – – – – – – – – – – –