Pipeline
Last updated
Last updated
Pipeline feature in Bigdata section enables a user to create ETL pipelines that can be used to transform raw data available at various data sources. This pipeline could be Streaming (process streaming data) OR Batch (process data at rest) depending on the user's requirement. This pipeline feature is written on top of Apache Spark In-Memory processing engine and user can create a pipeline by writing just some simple spark sql operations. Let us walk through with the help of an example.
To create a pipeline, go to BigData > Pipeline and click on Add New. Enter the necessary details - Type, Name, Description, Max Offset, Trigger Interval etc. For now we are creating Streaming ETL by selecting type as Stream.
Now select the data source you want from Source List. In this example we selected MQTT, now select topic from the dropdown list and proceed with the definition part. Now write the spark sql operations in Select definition that suits your use case the best.
Now lets test our pipeline using Debug feature (upper right corner in definition section) to see what data is being generated by our pipeline operations.
User can add as many stages of definition he want depending on the requirement. Output of one definition stage will act as input for next definition.
Once we are done with the definition of pipeline now we can select a Sink Point where we can write the final transformed data. Taking NoSQL database MongoDB for this example, select NoSQL from Type dropdown then select MongoDB Connector (which we created in Connectors Section) in Connectors dropdown, then select database in Databases dropdown.
Select what type of operation you want to perform Update/Insert then define a new collection name in the text box available, next it will ask for TTL (time to live - for how long in days you want to keep your records in database collection), then define the indexing that will help your query to run faster. Finally, save the Pipeline.
Click on play button to start the pipeline and it can be stopped/started/restarted at any time you want. You can click on Info button in actions to check the insights for created pipeline which describes about the performance related to rate of processing data.
This pipeline maintains the checkpoint for data that is being processed and if pipeline stops processing or fails due to any reason then it will start processing data from where it was left once you resume or restart it. An action to clear checkpoint is also given to the user for cases if we want a fresh start of the pipeline.
Now let's take an example of Batch ETL Pipeline, all you need to do is select Batch from Type dropdown in basic information section.
Select Data source (MongoDB in this example), specify a query to get data from selected data source, then, add the operations in definition.
You can debug the Batch ETL Pipeline also same as we did for Stream ETL Pipeline.
Once you are done with the definition and testing with debug then finally select the sink point to save aggregated data (MongoDB in this case) and save the Pipeline.