LinkIn Datahub Metadata Ingestion Scripts Unofficical Guide
When you just get to know LinkIn’s Datahub Metadata open source project, I think you should practice around the ETL Scripts under its metada-ingestion directory. The reason is that it will help you understand what problem Datahub is solving and how it solves.
However, there is a little barrier if you don’t have that much experience with Hive, MySQL, Kafka, LDAP or you don’t have those tools on hands but you still want to have a try to see if Datahub works for you. So I wrote this instruction for you. I also sent a PR to the project, but only partially content has been accepted so I decided to post all of them here.
Under `metadata-ingestion` directory, there are a few Python scripts to help us ingest metadata from sources such as Hive table (under `hive-etl`), Kafka (under `kafka-etl`), MySQL (under `mysql-etl`) and any supported relational database (under `rdbms-etl`) to help us get started.
The `README.md` under `metadat-ingestion` directory explains how we can configure those scritps to connect those different sources. This documentation and related scripts will walk you through how you can
1. Set up local docker envirionments of Hive, Kafka, MySQL and OpenLDAP
2. Configure each ingestion scripts and modify scripts if necessary to make them working with Hive, Kafka, MySQL and OpenLDAP
3. Validte metadata from those sources are ingested into datahub successfully.
Let’s get it started.
- Python 3.7 Environment
Only `requirements.txt` under `mce-cli` has highligted we should use Python 3.7 for `mce-cli` script. I have to modify all scripts under `metadata-ingestion` to be compatable with Python 3.7.
*Optional*
The easiest way having a Python 3.7 in my macOS is using [Anaconda](https://www.anaconda.com/).
2. MySQL
If you have run the Datahub succesfully with Docker Images it provides, you have had MySQL docker up & running. There is a database named `datahub` created, and you can use
```
docker exec -it mysql mysql -u datahub -pdatahub datahub
```
to access this `mysql` docker, and find the table `metadata_aspect`, and it has a schema as the following
+ — — — — — — + — — — — — — — + — — — + — — -+ — — — — -+ — — — -+
| Field | Type | Null | Key | Default | Extra |
+ — — — — — — + — — — — — — — + — — — + — — -+ — — — — -+ — — — -+
| urn | varchar(500) | NO | PRI | NULL | |
| aspect | varchar(200) | NO | PRI | NULL | |
| version | bigint(20) | NO | PRI | NULL | |
| metadata | longtext | NO | | NULL | |
| createdon | datetime(6) | NO | | NULL | |
| createdby | varchar(255) | NO | | NULL | |
| createdfor | varchar(255) | YES | | NULL | |
+ — — — — — — + — — — — — — — + — — — + — — -+ — — — — -+ — — — -+
Our goal is to ingest the schema of `metadata_aspect` into the Datahub.
Open `mysql_etl.py`, and fill it the `HOST`, `DATABASE`, `USER`, and `PASSWORD` with the following
```
HOST = ‘127.0.0.1’
DATABASE = ‘datahub’
USER = ‘datahub’
PASSWORD = ‘datahub’
```
then run
```
pip install — user -r requirements.txt
```,
Once installation is finished, run
```
python mysql_etl.py
```
We are expected to see the following output in the console
{‘auditHeader’: None, ‘proposedSnapshot’: (‘com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot’, {‘urn’: ‘urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect,PROD)’, ‘aspects’: [{‘owners’: [{‘owner’: ‘urn:li:corpuser:datahub’, ‘type’: ‘DATAOWNER’}], ‘lastModified’: {‘time’: 0, ‘actor’: ‘urn:li:corpuser:datahub’}}, {‘schemaName’: ‘datahub.metadata_aspect’, ‘platform’: ‘urn:li:dataPlatform:mysql’, ‘version’: 10, ‘created’: {‘time’: 1587320913, ‘actor’: ‘urn:li:corpuser:datahub’}, ‘lastModified’: {‘time’: 1587320913, ‘actor’: ‘urn:li:corpuser:datahub’}, ‘hash’: ‘’, ‘platformSchema’: {‘tableSchema’: “[(‘urn’, ‘varchar(500)’, ‘NO’, ‘PRI’, None, ‘’), (‘aspect’, ‘varchar(200)’, ‘NO’, ‘PRI’, None, ‘’), (‘version’, ‘bigint(20)’, ‘NO’, ‘PRI’, None, ‘’), (‘metadata’, ‘longtext’, ‘NO’, ‘’, None, ‘’), (‘createdon’, ‘datetime(6)’, ‘NO’, ‘’, None, ‘’), (‘createdby’, ‘varchar(255)’, ‘NO’, ‘’, None, ‘’), (‘createdfor’, ‘varchar(255)’, ‘YES’, ‘’, None, ‘’)]”}, ‘fields’: [{‘fieldPath’: ‘urn’, ‘nativeDataType’: ‘varchar(500)’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}, {‘fieldPath’: ‘aspect’, ‘nativeDataType’: ‘varchar(200)’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}, {‘fieldPath’: ‘version’, ‘nativeDataType’: ‘bigint(20)’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}, {‘fieldPath’: ‘metadata’, ‘nativeDataType’: ‘longtext’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}, {‘fieldPath’: ‘createdon’, ‘nativeDataType’: ‘datetime(6)’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}, {‘fieldPath’: ‘createdby’, ‘nativeDataType’: ‘varchar(255)’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}, {‘fieldPath’: ‘createdfor’, ‘nativeDataType’: ‘varchar(255)’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}]}]}), ‘proposedDelta’: None} has been successfully produced!
3. Kafka
Still make assumpation you have had all docker images used by Datahub Quickstart up and running. Let’s head to `localhost:8000` with your favorite browser, you will see the `kafka schemda registry` UI. There are two schemas `MetadataAuditEvent-value` and `MetadataChangeEvent-value` available.
This `Kafka-etl` scripts will publish these two schemas to Datahub.
Run `pip install — user -r requirements.txt`, then run `python kafka_etl.py`.
4. Hive
To ingest Hive table to Datahub, we need to set up a Hive source. Use the this `[docker-hive](<https://github.com/big-data-europe/docker-hive>)` repo, and make some modification with `docker-compose.yml`
1. In `line 47`, the port of `prestodb` will be conflict with one of docker containers of datahub, change it to `7081:8080` shoud fix the problem
2. add `networks` in the end of file to make it same as what datahub is using
```
networks:
default:
name: datahub_network
```
3. Seed a Hive table
```
$ docker-compose exec hive-server bash
# /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000
> CREATE TABLE pokes (foo INT, bar STRING);
> LOAD DATA LOCAL INPATH ‘/opt/hive/examples/files/kv1.txt’ OVERWRITE INTO TABLE pokes;
> SELECT * FROM pokes;
```
We create a `pokes` table with two columns: foo and bar.
4. Run it
```
docker-compose up -d
```
Wait for related containers successfully up and running
5. Run `hive_etl.py`
We only need to specify the line 7 `HIVESTORE=localhost`. The script will query all tabls,find schemas of tables, assemble `MetadataChangeEvent(mce)` message, and publish to Kafka
In the end, you will see the following output
```
executing database query!
default.pokes dataset name!
{‘auditHeader’: None, ‘proposedSnapshot’: (‘com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot’, {‘urn’: ‘urn:li:dataset:(urn:li:dataPlatform:hive,default.pokes,PROD)’, ‘aspects’: [{‘owners’: [{‘owner’: ‘urn:li:corpuser:root’, ‘type’: ‘DATAOWNER’}], ‘lastModified’: {‘time’: 1587325355, ‘actor’: ‘urn:li:corpuser:root’}}, {‘upstreams’: [{‘auditStamp’: {‘time’: 1587325355, ‘actor’: ‘urn:li:corpuser:root’}, ‘dataset’: ‘urn:li:dataset:(urn:li:dataPlatform:hive,ma(name:bar,PROD)’, ‘type’: ‘bled:false’}]}, {‘elements’: [{‘url’: ‘localhost’, ‘description’: ‘sample doc to describe upstreams’, ‘createStamp’: {‘time’: 1587325355, ‘actor’: ‘urn:li:corpuser:root’}}]}, {‘schemaName’: ‘default.pokes’, ‘platform’: ‘urn:li:dataPlatform:hive’, ‘version’: 0, ‘created’: {‘time’: 1587050099, ‘actor’: ‘urn:li:corpuser:root’}, ‘lastModified’: {‘time’: 1587325355, ‘actor’: ‘urn:li:corpuser:root’}, ‘hash’: ‘’, ‘platformSchema’: {‘OtherSchema’: “[(‘foo’, ‘int’, ‘’), (‘bar’, ‘string’, ‘’)]”}, ‘fields’: [{‘fieldPath’: ‘’, ‘description’: ‘’, ‘nativeDataType’: ‘string’, ‘type’: {‘type’: {‘com.linkedin.pegasus2avro.schema.StringType’: {}}}}]}]}), ‘proposedDelta’: None} has been successfully produced!
```
5. OpenLDAP-etl
LDAP integration is important part of Datahub in my opinon. It helps us find the detail information of the owner of a dataset. We will use an OpenLDAP Docker image to set up our LDAP source.
You can find a OpenLDAP docker-compose file [here](<https://gist.github.com/thomasdarimont/d22a616a74b45964106461efb948df9c>). It comes with a `OpenLDAP server` and `Php LDAP Admin` portal for us. I have to modify it to make the docker-compose.yml working. So I add my scripts here.
1. Start OpenLDAP and Php LDAP admin
```
docker-compose up -d
```
2. Head to `localhost:7080` with your browser, enter the following credential to login
```
username: cn=admin,dc=example,dc=org
password: admin
```
3. Import `sample-ldif.txt` to come up with your organization. `sample-ldif.txt` contains information about
* group: we set up a `people` group
* peoples under `people` group: here are `Simpons` family member under `people` group.
In the end, it might look like this

Once we finish setting up our organization, we are about to run `openldap-etl.py` script.
This script is mostly based on `ldap-etl.py`. However, there is an important attribute `sAMAccountName` which is not exist in OpenLDAP. So we have to modify it a little bit.
In our script, we query a user by his given name: Homer, we also filter result attributes to a few. We also look for Homer’s manager, if there is one.
Once we find Homer, we assemble his information and his manager’s name to `corp_user_info`, as a message of `MetadataChangeEvent` topic, publish it.
After Run `pip install — user -r requirements.txt`, then run `python kafka_etl.py`, you are expected to see
```
{‘auditHeader’: None, ‘proposedSnapshot’: (‘com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot’, {‘urn’: “urn:li:corpuser:b’Homer Simpson’”, ‘aspects’: [{‘active’: True, ‘email’: b’hsimpson’, ‘fullName’: “b’Homer Simpson’”, ‘firstName’: “b’Homer”, ‘lastName’: “Simpson’”, ‘departmentNumber’: b’1001', ‘displayName’: b’Homer Simpson’, ‘title’: b’Mr. Everything’, ‘managerUrn’: “urn:li:corpuser:b’Bart Simpson’”}]}), ‘proposedDelta’: None} has been successfully produced!
```