Spark 4.0 Intro to Custom Data Sources with SQS
Let's explore PySpark's new custom data sources feature by making our own source for AWS SQS.
Apache Spark v4.0 is coming soon and is jam-packed with awesome features, among which includes the much-awaited custom data sources API for PySpark (also available in Databricks Runtime 15.3+). Previously, this feature was only available to Scala programmers.
This enables data engineers to create integrations for reading and writing data, in batch or with streaming, with idiomatic Spark code. Let’s take a tour of this new feature by building our own custom data source for AWS Simple Queue Service (SQS).
Implementing the DataSource class
The first thing we need to create is the DataSource
class which is the top-level class that represents our custom data source.
from pyspark.sql.datasource import (
DataSource,
DataSourceReader,
InputPartition,
)
from pyspark.sql.types import StructType
from datetime import datetime
from typing import Iterator, Tuple
from dataclasses import dataclass
class SQSDataSource(DataSource):
"""
PySpark data source for batch querying data from a
SQS queue.
"""
@classmethod
def name(cls):
return "spark_sqs"
def schema(self):
return """
message_id STRING,
receipt_handle STRING,
md5_of_body STRING,
body STRING,
sent_timestamp TIMESTAMP
"""
def reader(self, schema: StructType):
return SQSDataSourceReader(schema, self.options)
The name method tells Spark the name of our custom source (spark_sqs) which is what we’ll later use in spark.read.format(“spark_sqs”)
. Then we have a method to return the data source schema, as well as a simple data source reader.
Note: the DataSource API supports streaming readers, as well as writing your own sinks to write data. In this part we will be looking at the simple reader only.
Implementing the DataSourceReader
Next, we must implement the simple data source reader.
class SQSDataSourceReader(DataSourceReader):
def __init__(self, schema: StructType, options: dict):
print('Initializing SQS Data Source Reader')
self.schema: StructType = schema
self.options: dict = options
if not options.get('queue_url'):
raise ValueError('queue_url is required')
self.queue_url = options.get('queue_url')
self.region_name = options.get('region_name', 'us-east-1')
self.max_messages = int(options.get('max_messages', '10'))
self.visibility_timeout = int(options.get('visibility_timeout', '20'))
self.wait_time_seconds = int(options.get('wait_time_seconds', '20'))
self.delete_message = options.get('delete_message', 'false').lower() == 'true'
def read(self, partition: InputPartition) -> Iterator[Tuple]:
import boto3 # Must import here for serialization
sqs = boto3.client('sqs', region_name=self.region_name)
response = sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=self.max_messages,
WaitTimeSeconds=self.wait_time_seconds,
VisibilityTimeout=self.visibility_timeout,
MessageSystemAttributeNames=['SentTimestamp']
)
for message in response.get('Messages', []):
yield (
message['MessageId'],
message['ReceiptHandle'],
message['MD5OfBody'],
message['Body'],
datetime.fromtimestamp(int(message['Attributes']['SentTimestamp']) / 1000)
)
if self.delete_message:
sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
This is mostly handled by the read method, which we will use to receive messages from the queue and yield them; the rest of this code is just boilerplate to offer flexibility so we may configure SQS poll settings.
Using the custom data source
To use our custom data source, we must first register it to make Spark aware of the implementation.
spark.dataSource.register(SQSDataSource)
Now we can read from a queue with idiomatic Spark code:
spark.read.format("spark_sqs").options(
queue_url='https://sqs.us-east-1.amazonaws.com/1234567890/makewithdata-events',
region_name='us-east-1',
).load().show()
+--------------------+--------------------+--------------------+--------------------+--------------------+
| message_id| receipt_handle| md5_of_body| body| sent_timestamp|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|7322a8a2-180f-4a3...|AQEBb9rBz2DJ8RAWA...|c89b42169df3bbb3e...|{\n "_id": "67...|2024-12-16 20:13:...|
|d6a1d9b8-c15f-44c...|AQEBVnrFbfP/qrBpg...|daa063a083235cf91...|{\n "_id": "67...|2024-12-16 20:13:...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
Next Steps
This is merely a quick way to play around with custom sources, but our SQS source could be a lot more useful if we added more features:
Implement Streaming Reader that deletes messages as the reader writes checkpoints.
Poll until queue is empty.
Implement a Sink so we can write data to SQS as well.
Extract all available Attribute data from the messages.
If you’d like to see these next parts implemented, leave me a comment below and subscribe to the MakeWithData blog!