Stream Anything on Databricks
Learn how to use Spark Streaming and Databricks Autoloader to efficiently process unsupported file formats, including a step-by-step guide and best practices
Spark is widely recognized for its ability to handle large-scale data streams with scalability and fault tolerance. While Spark Streaming and Databricks Autoloader inherently support standard file formats like JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC, their versatility extends far beyond these. This blog explores how Spark Streaming and Databricks Autoloader can process file types that aren’t natively supported.
Use Cases?
The first involved processing rosbag files generated by cars. Rosbag is a format used in the Robot Operating System (ROS) to record and store time-sequenced data, such as sensor readings, video, or robot states. In this case, cars were generating these files, and the data had to be processed sequentially to maintain accuracy, which is why I wrote the blog. It demonstrates how Spark Streaming and Autoloader effectively manage this ordered data.
The second use case was for a spatial intelligence company that used video monitoring to improve productivity at manufacturing facilities. They collected video data, which had to be processed as a time series, ensuring the correct order of timestamps, similar to the sequential requirements of Rosbag files.
Spark Streaming and Autoloader helped identify new files and freed them from worrying about tracking what was processed. Spark helped in the parallel processing of these files.
The Process Flow:
File Detection with Autoloader: Autoloader identifies new files, an essential step for real-time data processing. It ensures every new file is detected and queued for processing, providing the actual path for reading.
Custom UDF for File Parsing: We develop a custom User-Defined Function (UDF) to manage unsupported file types. This UDF is crafted specifically for reading and processing the designated file format.
Data Processing and Row Formation: Within the UDF, we process the file content, transforming it into structured data, usually in row format.
Writing Back to Delta Table: We then write the processed data back to a Delta table for further use.
The example below is for ROS Bag, but the same method can be translated for any other file type.
Setting Up the Environment
Firstly, we need to prepare our Databricks environment:
# Databricks notebook source
# MAGIC %pip install bagpy
dbutils.library.restartPython()
We install Baggy, a Python library for ROS bag files, and restart the Python environment to ensure the library is loaded correctly. Importing Necessary Libraries
Next, we import the required Python libraries:
from typing import List, Dict
import boto3
import rosbag
import tempfile
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, LongType, FloatType
from pyspark.sql import SparkSession
These imports include standard data manipulation tools, AWS S3 access (boto3), ROS bag reading capabilities (rosbag), and necessary PySpark components.
Detect new files using Autoloader
# Spark streaming setup for ROS bag files
s3_data_path = "s3a://one-env/jiteshsoni/Vehicle/"
table_name = "rosbag_imu"
checkpoint_location = f"/tmp/checkpoint/{table_name}/"
stream_using_autoloader_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryfile")
.option("cloudFiles.includeExistingFiles", "true")
.load(s3_data_path)
)
display(stream_using_autoloader_df)Custom UDF to read & parse any file type
The core function extract_rosbag_data reads data from a ROS bag file in an S3 bucket and returns a list of dictionaries containing the extracted data:
def extract_rosbag_data(s3_rosbag_path: str) -> List[Dict]:
"""
Extracts data from a ROS bag file stored in S3, converting it into a list of dictionaries.
Args:
s3_rosbag_path (str): The S3 path to the ROS bag file.
Returns:
List[Dict]: A list of dictionaries with data from the ROS bag.
"""
interested_topics = ['/ublox_trunk/ublox/esfalg']
extracted_data = []
# Extracting the S3 bucket and file key from the provided path
bucket_name, s3_file_key = s3_rosbag_path.split('/', 3)[2:4]
# Using boto3 to download the ROS bag file into memory
s3 = boto3.resource('s3')
obj = s3.Object(bucket_name, s3_file_key)
file_stream = obj.get()['Body'].read()
# Storing the downloaded file temporarily
with tempfile.NamedTemporaryFile() as temp_file:
temp_file.write(file_stream)
temp_file.flush()
# Reading messages from the ROS bag file
with rosbag.Bag(temp_file.name, 'r') as bag:
for topic, msg, timestamp in bag.read_messages(topics=interested_topics):
message_data = {field: getattr(msg, field) for field in msg.__slots__}
message_data['timestamp'] = timestamp.to_sec()
extracted_data.append(message_data)
return extracted_data
This function uses boto3 to access the S3 bucket, reads the ROS bag file, and extracts the relevant data. At this point, we should test the function before we proceed. You want to change this function to read your file type for your use case.
extract_rosbag_data(s3_rosbag_path= "s3a://bucket_name/jiteshsoni/Vehicle/2023-08-04-16-30-24_63.bag")
Things to note here: In this example, I am downloading the file on the cluster, which could be avoided depending on whether your file reader supports it.
Defining the Data Schema
Before ingesting data into Spark, define the schema that aligns with the data structure in ROS bags. This is important because Spark needs to know what schema to expect.
# Define the schema that matches your ROS bag data structure
rosbag_schema = ArrayType(StructType([
StructField("Alpha", LongType(), True),
StructField("Beta", IntegerType(), True),
StructField("Gamma", IntegerType(), True),
StructField("Delta", IntegerType(), True),
StructField("Epsilon", IntegerType(), True),
StructField("Zeta", IntegerType(), True),
StructField("Eta", IntegerType(), True),
StructField("Theta", IntegerType(), True),
StructField("Iota", FloatType(), True)
]))
# Creating a User Defined Function (UDF) for processing ROS bag files
process_rosbag_udf = udf(extract_rosbag_data, returnType=rosbag_schema)
Now let’s test if Autoloader & Parsing if custom UDF is working using the display command.
rosbag_stream_df = (stream_using_autoloader_df
.withColumn("rosbag_rows", process_rosbag_udf("path"))
.withColumn("extracted_data", explode("rosbag_rows"))
.selectExpr("extracted_data.*", "_metadata.*")
)
# Displaying the DataFrame
display(rosbag_stream_df)
Writing the Stream to a Delta Table
Finally, we write the streaming data to a Delta table, enabling further processing or querying:
streaming_write_query = (
rosbag_stream_df.writeStream
.format("delta")
.option("mergeSchema", "true")
.option("queryName", f"IngestFrom_{s3_data_path}_AndWriteTo_{table_name}")
.option("checkpointLocation", checkpoint_location)
.trigger(availableNow=True)
.toTable(table_name)
)
Best Practices & Practical Considerations
If Auto Loader supports your file format natively, you should prefer using it for better integration and performance.
Write UDFs (User-Defined Functions) that are efficient and optimized to minimize processing time and resource utilization.
Implement graceful exception handling in UDFs to avoid disruptions in the data pipeline.
Access files directly from the cloud within UDFs rather than downloading them.
Understand that UDFs operate on Spark Executors, not the driver, with parallelization occurring at the file level.
Be aware that large files (around 2GB) are processed serially by UDFs, affecting performance.
Adjust Spark executor memory settings if you encounter out-of-memory errors with large files.
Aim for similar file sizes in your data pipeline to ensure better resource utilization.
Different sensors/cameras usually can produce very different schemas, necessitating different target tables.
Different sensors/cameras will produce data at very different rates; it makes sense for each sensor to have its independent checkpoint. For example, if a sensor is without internet connectivity for a month, it will surge in source data when it comes back online. Thus, each sensor having its own checkpoint is a good practice.
Reference
https://docs.databricks.com/en/ingestion/cloud-object-storage/auto-loader/index.html