Often AWS engineers find themselves in a situation where they need to achieve something, but AWS does not have the required feature. We recently found ourselves in one. We were already using a glue job to insert a large number of records in MongoDB. Now we wanted to delete these records via the same job if a delete flag is enabled.
We knew that Spark has a feature called repartition to scale.
partitioned_df = df.repartition(partition_count, col("salt_column”))
It writes parallelly to the DB via write_dynamic_frame.
glueContext.write_dynamic_frame.from_options(partitioned_df,
connection_type='documentdb',
connection_options=get_documentdb_options(collection))
But there are no delete operations like write_dynamic_frame. So we wrote to the support team at AWS support and they replied that such a feature was not implemented yet.
As engineers, we set out to find alternate solutions, and we discovered that we can achieve this using mapPartitions options.
ids = partitioned_df.rdd.mapPartitions(delete_from_db).collect()
mapPartitions return is a new RDD by applying a function to each partition of this RDD. In each partition, we can delete the records manually.
def delete_from_db(partitioned_df):
ids = []
collection = ''
for row in partitioned_df:
ids.append(row['id'])
collection = row['collection']
if(len(ids) > 0) :
return delete_records(collection, ids)
else :
return ids
def delete_records(collection, ids):
mongo_uri ="mongodb://{}:{}@{}/?ssl={}&ssl_cert_reqs=CERT_NONE".format(username, password), uri.split('//')[1],'true')
mongo_client = pymongo.MongoClient(mongo_uri)
db = mongo_client[database_name]
collection = db[collection]
chunked = chunks(ids, 1024)
for chunked_ids in chunked:
query = {"_id" : { '$in':chunked_ids}}
collection.delete_many(query)
return ids
And that is how we were able to delete records from MongoDB. We hope you found this blog useful.
- - - - - - - - - - - - -