The summary from last week’s post is that I was preparing to build a Data Analysis Pipeline. This was the macro plan that the project was being planned around. I’m going to use this plan as a way to outline this blog.
- Finalised Dashboard
- Gathering all of the Necessary Information
- Data
- Data Dictionary
- Planning the Model
- Jupyter Notebook (Python) & Lucidchart
- Datetime Dimension
- Passenger Count and Trip Distance Dimensions
- Pickup and Dropoff Dimensions
- Ratecode Dimension
- Payment Type Dimension
- Jupyter Notebook (Python) & Lucidchart
- Final Dimension Model
- Reviewing the Data
- Preparing the Data
- Creating the Dimensions
- Datetime Dimension
- Passenger Count and Trip Distance Dimensions
- Pickup and Dropoff Dimensions
- Ratecode Dimension
- Payment Type Dimension
- Migrating the Local Data to Google Cloud Storage
- Construct the ETL Components
- Compute Engine
- MAGE
- Open the Necessary Port(s)
- Checking Availability
- Working in MAGE
- Creating the Fact Table in BigQuery
- BigQuery
- MAGE
- SSH in Browser
- MAGE
- BigQuery
- MAGE
- BigQuery
- Developing the Dashboard in Looker
- Connect Data to the Dashboard
Finalised Dashboard
https://lookerstudio.google.com/s/gJ2pMR8EdWY
Gathering all of the Necessary Information
Data
Whilst thinking about various types of data that is available on the internet, I considered different countries and how they would track their information. Not only that, but display it publicly. Given the amount of transaction information that would be available in a city that is overrun by taxis, I set out to find the data for this sector in New York City. The dataset was accessed on 6 November, 2023.
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
Data Dictionary
Before I build this pipeline, I need to build the model. I can build the model by using the data dictionary, located on the website.
https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
From the available information, I knew that it would be beneficial for me to create a data warehouse as I can continually add to this model in order to make quick and efficient business-oriented analysis and decisions. So, this means that I will need to convert the flat table format (.csv) into the Dimension modelling approach; data warehouse – the Fact and Dimension tables format.
Creating these models is an iterative process. Some of the understanding will come over time of modelling various different projects. Some of the understanding will come from experience of positive and negative experiences of modelling; events that were caught or missed in production due to the type of modelling that was planned. But, this current iteration will be the summation of decisions that were made when comparing the flat table data (.csv file) and this available data dictionary.
Planning the Model
Jupyter Notebook (Python) & Lucidchart
To best manipulate the data, I am going to use Jupyter Notebook (Python). Firstly, I will install the numpy library to use as mathematical functions for later. Loading the data and inspecting the first few rows (using the head method) will allow me to plan the approach for the model’s dimensions.
From this output, I can see that I will be constructing a Fact table using the VendorID. I will introduce a datetime dimension (table) and integrate the available information from columns 3 and 4.
I will be using Lucidchart to model this pipeline as a means to think out the logic and address any holes before implementing the plan.
Datetime Dimension
I could make the choice of two pickup and drop off dimensions for the date time, but I will integrate them into the one dimension at this point and I will monitor this for later to determine whether I change the normal form level.
Passenger Count and Trip Distance Dimensions
I will next create dimensions for both the passenger count and trip distance columns. I could include this in the fact table as they are a part of the transaction of each trip. However, I prefer to break these columns out into their own dimensions.
Pickup and Dropoff Dimensions
The next dimensions to be added will be the pickup and dropoff locations (latitude and longitude).
Ratecode Dimension
The next dimension that I’ll add is the rate code (the star rating that the drivers receive). Within the data dictionary, the values that can be assigned to the RateCodeID columns are the values 1-6.
However, when developing this model, I will include additional information, i.e., the name of the locations for the rates. This makes it a bit easier for us, and someone else, to understand the model.
Payment Type Dimension
I will also follow the same concept for the payment type.
Within the Fact table, we are interested in the transactions for each trip. This means that we can create attributes in the table using the following columns:
- Fare_amount
- Extra Miscellaneous
- MTA_tax
- Improvement_surcharge
- Tip_amount
- Tolls_amount
- Total_amount
Final Dimension Model
Reviewing the Data
Preparing the Data
Before I convert this flat file into the dimension model, I want to check that I have the correct data types for each of the attributes.
I can see that both pickup_ and dropoff_ datetimes are in the ‘object’ format. So, I will need to convert this object into data. I can use pandas to convert the datetimes object format to datetime.
This line of code is providing me an output of the, to be, datetime dimension, but without any duplicate values in the two columns.
After running this code, I realised that I would need to run .drop_duplicates().resent_index(drop=True) each time I created a new dimension. So, I decided to run these methods on the entire dataset and then indexed them against a new column, ‘trip_id’.
Creating the Dimensions
Datetime Dimension
Now, to create the datetime dimension. I am comparing the model that I created earlier, using Lucidchart, with the code that I am generating in Python.
Passenger Count and Trip Distance Dimensions
Pickup and Dropoff Dimensions
Ratecode Dimension
Payment Type Dimension
Fact Table
To create the fact table, I need to join all of the newly created dimensions. From the Lucidchart visual plan, I need to the fact table information (attributes), e.g., fare_amount, extra, mta_tax, etc, and the *_id column information.
Migrating the Local Data to Google Cloud Storage
Using this image from last week, I can now move from step 0 (preparing the data) to step 1 and upload this model to Google Cloud Storage.
I started by creating a new project in the Google Cloud Console.
Next I created a bucket for me to upload the .csv file.
Then I uploaded the data into this newly created bucket, and granted it public (reader) access.
Construct the ETL Components
Next is to deploy the MAGE onto the Compute Engine.
Compute Engine
First, I need to create and deploy a VM instance on Compute Engine.
After this has been created, I want to be able to connect to it, and I can do this through SSH.
This will open a window, allowing me to communicate to this newly created VM using SSH.
These are the commands that I ran to update the OS and install all of the latest files, Python, and pip:
- sudo apt-get install update
- sudo apt-get install python3-distutils
- sudo apt-get install python3-apt
- sudo apt-get install wget
- wget https://bootstrap.pypa.io/get-pip.py
- sudo python3 get-pip.py
MAGE
I navigated to the MAGE github repository and used the pip install method to make this available on this Google VM. https://github.com/mage-ai/mage-ai#using-pip-or-conda
Next is to create a new MAGE project on the VM by using the comment:
mage start uber-data-analysis-jamesmiller-cv
This screenshot shows that MAGE is running over port 6789.
Open the Necessary Port(s)
Before I can publicly access this information, I need to make sure that this port is open.
After opening my VM, I scroll down and click on the nic0 network interface:
Checking Availability
To check that this is working, I can go to my VM, find my public IP, copy and paste this IP into a new browser tab and add this port to the end:
Working in MAGE
This is where I will be creating the pipeline. So, I am going to create a new Standard (batch) pipeline.
I’m going to create one from scratch, but first I need to load my data into it. Since I have made the data publicly available, I am going to load it as an API:
This will create the Python code for me to load the data into:
I will need to place the URL of the .csv file that I have uploaded to Google Cloud Storage. So, I can navigate back to that tab and place the URL in and run the code: https://storage.googleapis.com/uber-data-analysis-jamesmiller-cv/uber_data.csv
After this is done loading the data, it provides some tests to see if there are any issues with this portion of the pipeline. This shows a successful load of the data.
Next will be to transform the data. So, I will scroll down and select Transformer > Python > Generic (no template).
On the screenshot below, we can see that we are slowly building the pipeline. When you select one of the blocks on the left, it will show you the python script that you will be adjusting in the middle section of the screen. For the transformation portion of this pipeline, this is where we can copy the code that we developed in Jupyter Notebooks earlier and place it in here.
Here we can see that we have successfully passed the data through the transformation portion:
From here, we need to load this data in this pipeline, which we can pass to BigQuery. To do this, I will need to pass this information through a dictionary.
Now that this has been created, I can create the exporter to send to BigQuery; Data exporter > Python > Google BigQuery:
This creates the data exporter block for the pipeline. However, there are some aspects of the code that I need to adjust in order for a connection to be made:
- table_id = ‘your-project.your_dataset.your_table_name’
Based on the name, it will create the table.
- config_path = path.join(get_repo_path(), ‘io_config.yaml’)
This .yaml file is located within MAGE and provides users with various service connection types, i.e., AWS, Google, MongoDB.
This information is obtainable through the Google Cloud Platform (GCP) console. Search: API & Services > Credentials > Create Credentials > Service Account (this means that we’re going to allow permissions for the VM to communicate with the other GCP services) > Name the service > Access: Role > Search: BigQuery Admin > Done.
Then a service account will be created.
I want to create a file where all of my credentials can be stored. So, I will click onto this account > Keys > Add Key > Create new key > JSON option. This will store to the local machine. I will use the credential information, from this newly created .json file, to update the io_config.yaml file, in the MAGE utils folder.
After updating the necessary fields, you can save the pipeline then ‘View pipeline’.
Creating the Fact Table in BigQuery
BigQuery
Now I’m going to create a connection to BigQuery. In the GCP console, I will search for it and open it.
Next I will create the dataset:
MAGE
With this newly created dataset, I will copy the name and paste it into the Data Explorer block in MAGE:
table_id = ‘your-project.your_dataset.your_table_name’
My project name is: uber-project-404402 (1),
My dataset is: uber_data_analysis (2), and
My table name will be fact_table (from the MAGE script).
Since I have converted the dataframe (df) from the transformer into a dictionary, I need to update the BigQuery data exporter function to account for this. So, I will update it from ‘df: DataFrame’ to ‘data’:
I will also update the BigQuery config loader from:
df
To:
DataFrame(data[‘fact_table’])
After running this code, an error highlighted that it was unable to find the Google Cloud. This means that we need to install the Google Cloud and BigQuery packages.
SSH in Browser
I can install these packages by navigating back to the Google VM instance and running another SSH connection window with the following commands:
sudo pip3 install google-cloud
sudo pip3 install google-cloud-bigquery
MAGE
Now I can try running the pipeline in MAGE again, with success:
BigQuery
I can navigate back to BigQuery, refresh the page, and see the fact table has now been loaded into the project menu:
MAGE
Since I’ve only completed this for the one table, I will now have to complete it for the other (dimensions). I will need to create a for loop in the Python code, cycling through all of the tables and uploading to BigQuery. Original code:
table_id = ‘uber-project-404402.uber_data_analysis.fact_table’
config_path = path.join(get_repo_path(), ‘io_config.yaml’)
config_profile = ‘default’
BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
DataFrame(data[‘fact_table’]),
table_id,
if_exists=’replace’,
For loop code:
config_path = path.join(get_repo_path(), ‘io_config.yaml’)
config_profile = ‘default’
for key, value in data.items():
table_id = ‘uber-project-404402.uber_data_analysis.{}’.format(key)
BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
DataFrame(value),
table_id,
if_exists=’replace’,
Running this updated block shows that the export was successful:
BigQuery
We can see these tables are now available in BigQuery:
To test the queries that I will be performing on this dataset, I will select all of the attributes in the fact table for the first 10 rows.
Since this works, I will now create the code for querying all the columns found in the initial flat file, but joining the dimensions together through the fact table:
SELECT
f.VendorID,
d.tpep_pickup_datetime,
d.tpep_dropoff_datetime,
p.passenger_count,
t.trip_distance,
r.rate_code_name,
pick.pickup_latitude,
pick.pickup_longitude,
dropo.dropoff_latitude,
dropo.dropoff_longitude,
pay.payment_type_name,
f.fare_amount,
f.extra,
f.mta_tax,
f.tip_amount,
f.tolls_amount,
f.improvement_surcharge,
f.total_amount
FROM
`uber-project-404402.uber_data_analysis.fact_table` f
JOIN `uber-project-404402.uber_data_analysis.datetime_dim` d ON f.datetime_id=d.datetime_id
JOIN `uber-project-404402.uber_data_analysis.passenger_count_dim` p ON p.passenger_count_id=f.passenger_count_id
JOIN `uber-project-404402.uber_data_analysis.trip_distance_dim` t ON t.trip_distance_id=f.trip_distance_id
JOIN `uber-project-404402.uber_data_analysis.rate_code_dim` r ON r.rate_code_id=f.rate_code_id
JOIN `uber-project-404402.uber_data_analysis.pickup_location_dim` pick ON pick.pickup_location_id=f.pickup_location_id
JOIN `uber-project-404402.uber_data_analysis.dropoff_location_dim` dropo ON dropo.dropoff_location_id=f.dropoff_location_id
JOIN `uber-project-404402.uber_data_analysis.payment_type_dim` pay ON pay.payment_type_id=f.payment_type_id;
I will then create this final analysis table, to use for the dashboard, by entering the following code:
CREATE OR REPLACE TABLE `uber-project-404402.uber_data_analysis.tbl_analytics` AS (
[<query above>]
)
Developing the Dashboard in Looker
Connect Data to the Dashboard
I’ll start by opening LookerStudio and opening a blank report and I will connect to the BigQuery to retrieve the data:
Then I can initiate the connection for the Uber dataset:
Starting with this blank canvas, I can slowly build up the dashboard:


2 responses to “19 – Uber Data Analysis Pipeline”
[…] Project 4 – Uber […]
[…] from text to number. Or a number field from integer to float. It could be something as complex as creating a pipeline to best understand choices made by rideshare users during various times of the day and the impacts […]