Skip to main content

Initial Data Load(IDL) & M&A Data Integration

The below diagram shows the how the M&A data will be migrated to CMD system via TAMAR system. Multiple M&A systems use the same approach to provide the data.

  1. M&A system will will provide flat file which will be manually verified, then python code will process it into TAMR for deduplication.
  2. Once deduplication is done, python code will be used to do data translation, data validation using CMD validation service and to create data report in Datalake.
  3. Form Datalake. ADF will process the data into CMD system.

CMD Concern Recomandation

On-Prem data migration to Modern Platform

As part of CMD modernization on Azure cloud platform, we need to migrate the CMD on-prem data to Postgres Database on Azure cloud platform. We used below approach to migrate data for Customer, Concern, Contact, Customer Facility and Collection Business Unit entity data.

Initial Data Load (IDL) from On-Prem to Cloud Postgres

One time activity IDL from On-Prem to cloud Postgress, we used the Azure Data Factory (ADF) data pipelines.

The Copy ADF pipeline will execute SQL statements against Oracle database to retrieve data from on-prem database dump into Azure data lake storage.

The Load ADF pipeline will take the data from data lake and connect to Postgres and insert into it for Customer, Contact, Concern, CustomerFacility and CBU entities.

These pipelines will be used during the IDL process when data needs to be loaded into SIT, PREPROD and PROD environments.

Realtime data sync between Postgres to ElasticSearch

After IDL of Customer, Contact, Concern, CustomerFacility and CBU entities data, for every create and update events on every entity in Postgres needs to be synced up in ElasticSearch engin for read, search and retrieve purpose.

The database triggers on CUSTOMER_INFORMATION, CONTACT_INFORMATION, FACILITY_INFORMATION and ref_collection_business_units will be configured to push respective entity code into respective database channels.

The Azure databricks notebook is created for each channel to listen and get the customer code and execute SQL query which will convet row to JSON and insert/update the document in the respective index in elastic search.

Trigger on CUSTOMER_INFORMATION table (Customer & Concern Entity)

CREATE TRIGGER after_upsert_cust_concern AFTER UPDATE or INSERT ON mdm_smds.customer_information FOR EACH ROW EXECUTE PROCEDURE mdm_smds.notify_cust_concern_upserts();

Below is the notify_cust_concern_upserts() functaion

CREATE or REPLACE FUNCTION mdm_smds.notify_cust_concern_upserts() RETURNS trigger AS $BODY$ declare vDescription TEXT; vId TEXT; vEType TEXT; vReturn RECORD; begin vEType := NEW.CUSTOMER_ROLE_TYPE; if (tg_op = 'INSERT' and vEType='CUST') then vId := NEW.CUSTOMER_CMD_CODE; vDescription := vDescription || 'created. Id: ' || vId; vReturn := NEW; perform pg_notify('cust_upserts', vId); elseif (tg_op = 'UPDATE' and vEType='CUST') then vId := NEW.CUSTOMER_CMD_CODE; vDescription := vDescription || 'updated. Id: ' || vId; vReturn := NEW; perform pg_notify('cust_upserts', vId); end if;

if (tg_op = 'INSERT' and vEType='CONCRN') then vId := NEW.CUSTOMER_CMD_CODE; vDescription := vDescription || 'created. Id: ' || vId; vReturn := NEW; perform pg_notify('concern_upserts', vId); elseif (tg_op = 'UPDATE' and vEType='CONCRN') then vId := NEW.CUSTOMER_CMD_CODE; vDescription := vDescription || 'updated. Id: ' || vId; vReturn := NEW; perform pg_notify('concern_upserts', vId); end if; return vReturn;

end $BODY$ LANGUAGE 'plpgsql';

Query to convert Customer to JSON

Refer file JSON_conversion_queries.sql Query1

Query to convert Concern to JSON

Refer file JSON_conversion_queries.sql Query2

Databricks notebook code:

  1. Customer Bulk Dataload into ElasticSearch Index: Refer file ES_dataload_queries.py Prgm1
  2. Customer RealTime DataSync between cloud DB and elastic index code: #TO syncup the data from write DB (Postgress) to ElasticSearch on realtime basis #!/usr/bin/env python

-- coding: utf-8 --

from elasticsearch import Elasticsearch, helpers import psycopg2 import json import select import datetime def get_df(cust_code): print("query execution started") print(datetime.datetime.now()) query = " select row_to_json(CustomerEnty) Customer_Root from ( select row_to_json(CustomerDetail) CustomerEntity from ( select row_to_json(Cust_Details) as CustomerDetails from ( select ( select row_to_json(Cust_core) from ( select customer_cmd_code customerCode, ( select array_to_json( array_agg( row_to_json(Ext_Identifiers) ) ) from ( select SOURCE_SYSTEM_NAME AS externalSystemName, EXTERNAL_SYSTEM_IDENTIFIER as externalSystemReference, case when EXT_ID.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CUSTOMER_EXTERNAL_IDENTIFIERS EXT_ID JOIN mdm_smds.ref_alternate_codes ALT_CODE ON EXT_ID.EXTERNAL_SOURCE_SYSTEM = ALT_CODE.CODE where CUSTOMER_CMD_CODE = CUST.customer_cmd_code order by EXTERNAL_SOURCE_SYSTEM asc ) Ext_Identifiers ) as ExternalSystemIdentifiers, legal_name legalName, trading_name tradingName, ACCOUNT_GROUP_TYPE customerType, customer_website url, is_sole_proprietor soleProprietor, ( select row_to_json(Cust_stat) from ( select CA.CUSTOMER_STATUS_CD as code, STATUS.Status_Name as name, ( select array_to_json( array_agg( row_to_json(Cust_stat_reason) ) ) from ( select CSA.REASON_CD as reasonCode, CSA.REASON_NAME as reasonName, CSA.REASON_DESC as reasonDescription from mdm_smds.CUSTOMER_STATUS_REASONS CSA where CSA.customer_cmd_code = CUST.customer_cmd_code ) Cust_stat_reason ) as CustomerStatusReasons from mdm_smds.customer_information CA join mdm_smds.Ref_Customer_Status STATUS on STATUS.Status_CODE = CA.CUSTOMER_STATUS_CD where CA.customer_cmd_code = CUST.customer_cmd_code ) Cust_stat ) as CustomerStatus, ( select row_to_json(Cust_Phone) from ( select CUST_PH.PHONE_TYPE as phoneNumberType, to_json( ( SELECT d FROM ( SELECT DIAPRE.COUNTRY_NAME as countryName, CUST_PH.ISD_COUNTRY_CODE as isoCountryCode ) d ) ) AS phoneCountry, CUST_PH.ISD_DIALING_CODE as internationalDialingCode, '' extensionNumber, CUST_PH.PHONE_NUM as number, false deleteFlag FROM mdm_smds.CUSTOMER_INFORMATION CUST_PH left outer join mdm_smds.Ref_International_Dialing_Prefix DIAPRE on DiaPre.COUNTRY_NAME = CUST_PH.ISD_COUNTRY_CODE where CUST_PH.CUSTOMER_CMD_CODE = CUST.customer_cmd_code ) Cust_Phone ) as TelecommunicationNumber, ( select array_to_json( array_agg( row_to_json(Cust_Brand) ) ) from ( select RB.brand_name as brandName, CB.CUSTOMER_BRAND_CD as brandCode, case when CB.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CUSTOMER_BRANDS AS CB JOIN mdm_smds.REF_BRANDS AS RB ON CB.customer_brand_cd = RB.brand_lra_code where CUSTOMER_CMD_CODE = CUST.CUSTOMER_CMD_CODE order by CUSTOMER_BRAND_CD asc ) Cust_Brand ) as CustomerBrandIdentifier, ( select row_to_json(Cust_Address) from ( select street_no streetNumber, address_line1 streetName, pobox poBox, address_line2 apartmentOrFloor, address_line3 subArea, district, city, to_json( ( SELECT d FROM ( SELECT state_code as regionCode, state_name regionName ) d ) ) AS region, postal_code, INVOICE_LNG_CD as language, to_json( ( SELECT d FROM ( SELECT country_code isoCountryCode, country_name countryName ) d ) ) AS country, latitude, longitude, trim(concat_ws(' ',pobox,street_no,address_line1,address_line2,address_line3)) Single_line_address from mdm_smds.customer_information CA where CA.customer_cmd_code = CUST.customer_cmd_code ) Cust_Address ) as CustomerAddress from mdm_smds.customer_information CC where CC.customer_cmd_code = CUST.customer_cmd_code ) Cust_core ) as CustomerCoreInformation, ( select row_to_json(Ext_Info) from ( select customer_group_type customerGroupType, invoice_lng_cd invoiceLanguagePreference, brok brokerage, fofmc forwardersCompensation from mdm_smds.customer_information CA where CA.customer_cmd_code = CUST.customer_cmd_code ) Ext_Info ) as CustomerExtendedInformation, ( select array_to_json( array_agg( row_to_json(cust_ref) ) ) from ( select to_json( ( SELECT d FROM ( SELECT '' as isoCountryCode, '' as countryName ) d ) ) AS identifierCountry, REF_TYPE as identifierType, REF_TYPE_CODE as identifierCode, REFERENCE_VALUE as identifierValue, case when IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CUSTOMER_TAX_REF_INFO where CUSTOMER_CMD_CODE = CUST.customer_cmd_code order by REF_TYPE asc ) cust_ref ) as CustomerIdentifiers, ( select row_to_json(bvd_info) from ( select MARKET_CAP, BVD_MAJOR_SECTOR, OPERATING_REVENUE_TURN_OVER, DATE_OF_ACCOUNT, PROFIT_MARGIN_PERCENT, CREDIT_RATING, RECOMMENDED_CREDIT_LIMIT from mdm_smds.CUSTOMER_BVD_INFORMATION bvd where bvd.customer_cmd_code = CUST.customer_cmd_code ) bvd_info ) as CustomerEnrichedInformation, ( select array_to_json( array_agg( row_to_json(hier_info) ) ) from ( select GUO_BVD_ID, GUO_BVD_NAME, HQ_BVD_ID, HQ_BVD_NAME from mdm_smds.CUSTOMER_BVD_INFORMATION hier where hier.customer_cmd_code = CUST.customer_cmd_code ) hier_info ) as CustomerHierarchyInformations, ( select array_to_json( array_agg( row_to_json(Cust_seg) ) ) from ( select to_json( ( SELECT d FROM ( SELECT CSEG.BRAND_CODE as brandCode, BRANDS.BRAND_NAME as brandName ) d ) ) AS segmentBrandInformation, json_agg( row_to_json( ( SELECT t FROM ( SELECT CSEG.SEGMENT_TYPE_CODE as segmentTypeCode, CSEG.SEGMENT_VALUE_CODE as segmentValueCode, CSEG.SEGMENT_VALUE_NAME as segmentValueName, CSEG.SEGMENT_VALUE_DESC as segmentValueDescription, case when CSEG.IS_DELETED = 'Y' THEN true else false end as deleteFlag ) t ) ) ) AS SegmentClassifications from mdm_smds.CUSTOMER_SEGMENT_DETAIL CSEG join mdm_smds.REF_BRANDS BRANDS on BRANDS.BRAND_LRA_CODE = CSEG.brand_code where CSEG.CUSTOMER_CMD_CODE = CUST.customer_cmd_code group by CSEG.BRAND_CODE, BRANDS.BRAND_NAME ) as Cust_seg ) as CustomerSegments, ( select array_to_json( array_agg( row_to_json(Cust_Rel) ) ) from ( select REL.PARENT_CUSTOMER_CODE as parentCustomerCode, CUSTP.TRADING_NAME as parentCustomerName, CHILD_CUSTOMER_CODE as childCustomerCode, CUSTC.TRADING_NAME as childCustomerName, REL.REL_TYPE_CODE as relationshipType, rel_type.REL_TYPE_NAME as relationshipTypeName, case when REL.VALID_THRU_DATE > current_date then true else false end as relationshipStatus FROM mdm_smds.CUSTOMER_RELATIONSHIPS REL join mdm_smds.CUSTOMER_INFORMATION CUSTP on CUSTP.CUSTOMER_CMD_CODE = REL.PARENT_CUSTOMER_CODE join mdm_smds.CUSTOMER_INFORMATION CUSTC on CUSTC.CUSTOMER_CMD_CODE = REL.CHILD_CUSTOMER_CODE join mdm_smds.Ref_Relationship_Types rel_type on rel.rel_type_code = rel_type.rel_type_code where REL.PARENT_CUSTOMER_CODE = CUST.customer_cmd_code order by REL.REL_TYPE_CODE asc ) Cust_Rel ) as CustomerRelationships, ( select array_to_json( array_agg( row_to_json(Web_Bill) ) ) from ( select to_json( ( SELECT d FROM ( SELECT WEBBIL.BRAND_CODE as brandCode, BRAND.BRAND_NAME as brandName ) d ) ) AS webBillBrandInformation, WEBBIL.NEGOTIABLE_BL as negotiableBL, WEBBIL.NON_NEGOTIABLE_WAYBILL as nonNegotiableBL, case when WEBBIL.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CUSTOMER_WEB_BL_DETAIL WEBBIL join mdm_smds.ref_brands BRAND on WEBBIL.BRAND_CODE = BRAND.BRAND_LRA_CODE where WEBBIL.CUSTOMER_CMD_CODE = CUST.CUSTOMER_CMD_CODE ) Web_Bill ) as CustomerWebBillLadings, ( select array_to_json( array_agg( row_to_json(Cust_CBU) ) ) from ( SELECT to_json( ( SELECT d FROM ( SELECT CBUREL.BRAND_CODE as brandCode, BRAND.BRAND_NAME as brandName ) d ) ) AS CbuBrandInformation, json_agg( row_to_json( ( SELECT t FROM ( SELECT REFCBU.CBU_NAME as cbuName, CBUREL.CBU_ID as cbuId, CBUREL.CBU_TYPE as cbuType, case when CBUREL.VALID_THRU_DATE > current_date then true else false end as cbuRelationshipStatus ) t ) ) ) AS CbuInformation FROM mdm_smds.CUSTOMER_CBU_DETAIL CBUREL join mdm_smds.ref_brands BRAND on CBUREL.BRAND_CODE = BRAND.BRAND_LRA_CODE join mdm_smds.ref_collection_business_units REFCBU ON CBUREL.CBU_ID = REFCBU.CBU_ID where CBUREL.CUSTOMER_CMD_CODE = CUST.CUSTOMER_CMD_CODE group by CBUREL.BRAND_CODE, BRAND.BRAND_NAME ) Cust_CBU ) as CollectionBusinessUnits, to_json( ( SELECT d FROM ( SELECT CUST.CREATE_USER as creationUser, CUST.CREATE_TIME as creationDate, CUST.UPDATE_USER as lastUpdateUser, CUST.UPDATE_TIME as lastUpdateDate, CUST.SOURCE_OF_LAST_UPDATE as lastUpdateSourceSystem ) d ) ) AS customerAuditData from mdm_smds.CUSTOMER_INFORMATION CUST where cust.customer_cmd_code = '" + cust_code + "' ) as Cust_Details ) as CustomerDetail ) as CustomerEnty "

#print(query) df=spark.read.format("jdbc").option("params", cust_code).option("driver","org.postgresql.Driver").option("url","jdbc:postgresql://cmd-postgres-dev01.postgres.database.azure.com:5432/cmd_pg_dev01?user=admin_pg_dev01@cmd-postgres-dev01&password=Z1sh@Rha!&sslmode=require").option("query", query).load() df.createOrReplaceTempView("CUST_JSON") print("query execution completed") print(datetime.datetime.now()) return df #get_df("CN50136572") #Get the generated JSON from command1 (from dataframe), and connect to Postgress to check in the channel for any event. If any event present, connect to elastisearch and dump the JSON document def upsert(index_name,primary_key,es):

present = es.exists(index= index_name, id= primary_key) #response = es.search(index = index_name, body = search_param) df = get_df(primary_key) df_json = df.toJSON() list = df_json.collect() line = json.loads(list[0]) body = json.loads(line['customer_root']) #print(body)

if(not present): es.index( index = index_name, document = body, id = primary_key ) else: print("update started") print(datetime.datetime.now()) es.update( index = index_name, id = primary_key, doc = body ) print("update successful") print(datetime.datetime.now())

declare connection to database

connection = psycopg2.connect(dbname="cmd_pg_dev01", user="admin_pg_dev01@cmd-postgres-dev01", host="cmd-postgres-dev01.postgres.database.azure.com", port="5432", password="Z1sh@Rha!") print("PG connection Successfull")

Elastic Search Connection

es = Elasticsearch( "https://20.4.1.212:9200", ca_certs='dbfs:/FileStore/shared_uploads/harshit.c@maersk.com/http_ca.crt', basic_auth=('elastic','xBijp7ROOC1A4xRVXVvA'), #use_ssl=True, verify_certs=False, ssl_show_warn=False ) #listen to channel #set to autocommit connection.set_isolation_level(0) #0 = ISOLATION_LEVEL_AUTOCOMMIT cur = connection.cursor() #new_cust_upsert cur.execute("LISTEN cust_upserts;") print("listen time") print(datetime.datetime.now()) while True: print("here 1") select.select([connection],[],[]) #sleep until there is some data print(connection.poll() ) #get the message print(connection.notifies) print("here 2") while connection.notifies: notification = connection.notifies.pop() #pop notification from list #now do anything needed! payload = notification.payload print(f"channel: {notification.channel }") print(f"message: {notification.payload}") upsert("customerentity", payload ,es) 3. Concern Bulk Dataload into ElasticSearch Index: Refer file ES_dataload_queries.py Prgm2 4. Concern RealTime DataSync between cloud DB and elastic index code: #TO syncup the data from write DB (Postgress) to ElasticSearch on realtime basis #!/usr/bin/env python

-- coding: utf-8 --

from elasticsearch import Elasticsearch, helpers import psycopg2 import json import select import datetime def get_df(cncrn_code): print("query execution started") print(datetime.datetime.now()) query = "select row_to_json(ConcernEnty) Concern_Root from( select row_to_json(ConcernDetail) ConcernEntity from ( select customer_cmd_code concernCode, ( select array_to_json(array_agg(row_to_json(Ext_Identifiers))) from ( select SOURCE_SYSTEM_NAME AS externalSystemName,EXTERNAL_SYSTEM_IDENTIFIER as externalSystemReference, case when EXT_ID.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CUSTOMER_EXTERNAL_IDENTIFIERS EXT_ID JOIN mdm_smds.ref_alternate_codes ALT_CODE ON EXT_ID.EXTERNAL_SOURCE_SYSTEM = ALT_CODE.CODE where CUSTOMER_CMD_CODE=CON.customer_cmd_code order by EXTERNAL_SOURCE_SYSTEM asc ) Ext_Identifiers ) as ExternalSystemIdentifiers, trading_name as name, CUSTOMER_STATUS_CD as status, COUNTRY_CODE as isoCountryCode, ( select array_to_json(array_agg(row_to_json(Cust_Rel))) from ( select distinct CUST.CUSTOMER_CMD_CODE as customerCode, CUST.TRADING_NAME as customerName, CUST.CUSTOMER_STATUS_CD as customerStatus, CUST.COUNTRY_CODE as customerIsoCountryCode, REL.REL_TYPE_CODE as relationshipType, rel_type.REL_TYPE_NAME as relationshipTypeName, case when REL.VALID_THRU_DATE > current_date then true else false end as relationshipStatus FROM mdm_smds.CUSTOMER_RELATIONSHIPS REL join mdm_smds.CUSTOMER_INFORMATION CUST on CUST.CUSTOMER_CMD_CODE=REL.PARENT_CUSTOMER_CODE join mdm_smds.Ref_Relationship_Types rel_type on rel.rel_type_code = rel_type.rel_type_code where REL.CHILD_CUSTOMER_CODE=CON.customer_cmd_code and REL.REL_TYPE_CODE='CONCRN_MEM' order by REL.REL_TYPE_CODE asc ) Cust_Rel ) as concernMembers, to_json((SELECT d FROM (SELECT CON.CREATE_USER as creationUser, CON.CREATE_TIME as creationDate, CON.UPDATE_USER as lastUpdateUser, CON.UPDATE_TIME as lastUpdateDate, CON.SOURCE_OF_LAST_UPDATE as lastUpdateSourceSystem) d)) AS concernAuditData from mdm_smds.CUSTOMER_INFORMATION CON where CON.CUSTOMER_ROLE_TYPE='CONCRN' and CON.customer_cmd_code= '" + cncrn_code + "' ) as ConcernDetail ) as ConcernEnty"

#print(query) df=spark.read.format("jdbc").option("params", cncrn_code).option("driver","org.postgresql.Driver").option("url","jdbc:postgresql://cmd-postgres-dev01.postgres.database.azure.com:5432/cmd_pg_dev01?user=admin_pg_dev01@cmd-postgres-dev01&password=Z1sh@Rha!&sslmode=require").option("query", query).load() df.createOrReplaceTempView("CUST_JSON") print("query execution completed") print(datetime.datetime.now()) return df #get_df("CN50136572") #Get the generated JSON from command1 (from dataframe), and connect to Postgress to check in the channel for any event. If any event present, connect to elastisearch and dump the JSON document def upsert(index_name,primary_key,es):

present = es.exists(index= index_name, id= primary_key) #response = es.search(index = index_name, body = search_param) df = get_df(primary_key) df_json = df.toJSON() list = df_json.collect() line = json.loads(list[0]) body = json.loads(line['concern_root']) #print(body)

if(not present): es.index( index = index_name, document = body, id = primary_key ) else: print("update started") print(datetime.datetime.now()) es.update( index = index_name, id = primary_key, doc = body ) print("update successful") print(datetime.datetime.now())

declare connection to database

connection = psycopg2.connect(dbname="cmd_pg_dev01", user="admin_pg_dev01@cmd-postgres-dev01", host="cmd-postgres-dev01.postgres.database.azure.com", port="5432", password="Z1sh@Rha!") print("PG connection Successfull")

Elastic Search Connection

es = Elasticsearch( "https://20.4.1.212:9200", ca_certs='dbfs:/FileStore/shared_uploads/harshit.c@maersk.com/http_ca.crt', basic_auth=('elastic','xBijp7ROOC1A4xRVXVvA'), #use_ssl=True, verify_certs=False, ssl_show_warn=False ) #listen to channel #set to autocommit connection.set_isolation_level(0) #0 = ISOLATION_LEVEL_AUTOCOMMIT cur = connection.cursor() cur.execute("LISTEN concern_upserts;") print("listen time") print(datetime.datetime.now()) while True: print("here 1") select.select([connection],[],[]) #sleep until there is some data print(connection.poll() ) #get the message print(connection.notifies) print("here 2") while connection.notifies: notification = connection.notifies.pop() #pop notification from list #now do anything needed! payload = notification.payload print(f"channel: {notification.channel }") print(f"message: {notification.payload}") upsert("concernentity", payload ,es)

ElasticSearch Index & pipeline details:

  • For customerentity index ,Ingest pipeline in elastic is created to update the _id field value into customer code value. - Pipeline Creation: we can create the pipeline in Kibana dev tool like: PUT _ingest/pipeline/idToCustcode
{
"processors": [
{
"set": {
"description": "Set _id field to customer_code value",
"field": "_id",
"value": "{{{_source.customerentity.customerdetails.customercoreinformation.customercode}}}"
}
}
]
}

- Pipeline name is idToCustcodeand we can run the pipeline in Kibana dev tool like: POST _reindex?slices=20&refresh

{
"source": {
"index": "customerentity_stg"
},
"dest": {
"index": "customerentity",
"pipeline": "idToCustcode"
}
}

source will contain the stage index and dest will be having the destination index with the pipeline name.

  • For concernentity index ,Ingest pipeline in elastic is created to update the _id field value into concern code value. - Pipeline Creation: we can create the pipeline in Kibana dev tool like: PUT _ingest/pipeline/idToConcernCode
{
"processors": [
{
"set": {
"description": "Set _id field to concern_code value",
"field": "_id",
"value": "{{{_source.concernentity.concerncode}}}"
}
}
]
}

- Pipeline name is idToConcernCodeand we can run the pipeline in Kibana dev tool like: POST _reindex?slices=20&refresh

{
"source": {
"index": "concernentity_stg"
},
"dest": {
"index": "concernentity",
"pipeline": "idToConcernCode"
}
}

source will contain the stage index and dest will be having the destination index with the pipeline name.

Trigger on CONTACT_INFORMATION table (Contact Entity)

CREATE TRIGGER after_upsert_cont AFTER UPDATE or INSERT ON mdm_smds.contact_information FOR EACH ROW EXECUTE PROCEDURE mdm_smds.notify_cont_upserts();

Below is the notify_cont_upserts() function

CREATE or REPLACE FUNCTION mdm_smds.notify_cont_upserts() RETURNS trigger AS $BODY$ declare vDescription TEXT; vId TEXT; vEType TEXT; vReturn RECORD; begin

if (tg_op = 'INSERT') then vId := NEW.CONTACT_CMD_CODE; vDescription := vDescription || 'created. Id: ' || vId; vReturn := NEW; perform pg_notify('cont_upserts', vId); elseif (tg_op = 'UPDATE' and vEType='CUST') then vId := NEW.CONTACT_CMD_CODE; vDescription := vDescription || 'updated. Id: ' || vId; vReturn := NEW; perform pg_notify('cont_upserts', vId); end if;

return vReturn;

end $BODY$ LANGUAGE 'plpgsql';

Query to convert Contact to JSON

Refer file JSON_conversion_queries.sql Query3

Databricks notebook code:

  1. Contact Bulk Dataload into ElasticSearch Index: Refer file ES_dataload_queries.py Prgm3
  2. Contact RealTime DataSync between cloud DB and elastic index code: #TO syncup the data from write DB (Postgress) to ElasticSearch on realtime basis #!/usr/bin/env python

-- coding: utf-8 --

from elasticsearch import Elasticsearch, helpers import psycopg2 import json import select import datetime def get_df(cont_code): print("query execution started") print(datetime.datetime.now()) query = "( select row_to_json(CustomerContact) Contact_Root from( select row_to_json(Cont_Details) as ContactEntity from ( select row_to_json(cont) as CustomerContact from (select CONTACT_CMD_CODE contactCode, ( select array_to_json(array_agg(row_to_json(Cust_Ext_Identifiers))) from ( SELECT REL.CUSTOMER_CMD_CODE as customerCode, ( select array_to_json(array_agg(row_to_json(CUST_EXT_ID))) from ( select SOURCE_SYSTEM_NAME AS externalSystemName, EXTERNAL_SYSTEM_IDENTIFIER as externalSystemReference, case when EXT_ID.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CUSTOMER_EXTERNAL_IDENTIFIERS EXT_ID JOIN mdm_smds.ref_alternate_codes ALT_CODE ON EXT_ID.EXTERNAL_SOURCE_SYSTEM = ALT_CODE.CODE WHERE REL.CUSTOMER_CMD_CODE=EXT_ID.CUSTOMER_CMD_CODE order by EXTERNAL_SOURCE_SYSTEM asc ) CUST_EXT_ID ) as CustomerExternalSystemIdentifiers FROM mdm_smds.CONTACT_CUSTOMER_RELATIONSHIP REL where REL.rel_type_code = 'CUST_CONT' and REL.CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE ) Cust_Ext_Identifiers ) as CustomerIds, ( select array_to_json(array_agg(row_to_json(Cont_Ext_Identifiers))) from ( select SOURCE_SYSTEM_NAME AS externalSystemName, EXTERNAL_SYSTEM_IDENTIFIER as externalSystemReference, case when EXT_ID.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CONTACT_EXTERNAL_IDENTIFIERS EXT_ID JOIN mdm_smds.ref_alternate_codes ALT_CODE ON EXT_ID.EXTERNAL_SOURCE_SYSTEM = ALT_CODE.CODE where CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE order by EXTERNAL_SOURCE_SYSTEM asc ) Cont_Ext_Identifiers ) as ContactExternalSystemIdentifiers, CONT.FIRST_NAME_STANDARD as firstName, CONT.LAST_NAME_STANDARD as lastName, (select distinct case when (IS_MASTER_CONTACT is null or IS_MASTER_CONTACT='N') then false else true end isMasterContact from mdm_smds.CONTACT_CUSTOMER_RELATIONSHIP where REL_TYPE_CODE='CUST_CONT' and CONTACT_CMD_CODE= CONT.CONTACT_CMD_CODE limit 1 ) as isMasterContact, CONT.FIRST_NAME_LOCAL_LANGUAGE as internationalFirstName, CONT.LAST_NAME_LOCAL_LANGUAGE as internationalLastName, CONT.PRIMARY_EMAIL_ADDRESS as primaryEmailId, CONT.SECONDARY_EMAIL_ADDRESS as secondaryEmailId, CONT.CONTACT_STATUS as contactStatusCode, CONT.SALUTATION_LOCAL_LANGUAGE as internationalSalutationCode, CONT.SALUTATION_STANDARD as primarySalutationCode, CONT.TEAM_CONTACT_INDICATOR as isTeamContact, CONT.JOB_TITLE as contactJobTitle, CONT.DEPARTMENT as contactDepartment, 'CONT_CONT' as contactRole, CONT.LANGUAGE_PREFERENCE as languagePreference, ( select array_to_json(array_agg(row_to_json(teleComm))) from ( select 'TELE' AS communicationNumberType, TELEPHONE_ISD_COUNTRY_CODE as isoCountryCode, TELEPHONE_DIALING_CODE as internationalDialingCode, EXTENSION_NUMBER as extensionNumber, TELEPHONE_NUMBER as number FROM mdm_smds.CONTACT_INFORMATION CONTTELINFO where CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE UNION ALL select 'MOB' AS communicationNumberType, MOBILE_ISD_COUNTRY_CODE as isoCountryCode, MOBILE_DIALING_CODE as internationalDialingCode, null as extensionNumber, MOBILE_NUMBER as number FROM mdm_smds.CONTACT_INFORMATION CONTTELINFO where CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE UNION ALL select 'FAX' AS communicationNumberType, FAX_ISD_COUNTRY_CODE as isoCountryCode, FAX_DIALING_CODE as internationalDialingCode, null as extensionNumber, FAX_NUMBER as number FROM mdm_smds.CONTACT_INFORMATION CONTTELINFO where CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE ) teleComm ) as CommunicationNumbers, ( select array_to_json(array_agg(row_to_json(Cont_Brand))) from ( select distinct RB.Contact_CLASS_NAME as brandName, RB.Contact_CLASS_CODE as brandCode, case when CONTBRAND.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CONTACT_CLASSIFICATION AS CONTBRAND JOIN mdm_smds.Ref_Contact_Class_Type AS RB ON CONTBRAND.CLASSIFICATION_TYPE = RB.Contact_CLASS_TYPE where CONTBRAND.CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE and CONTBRAND.CLASSIFICATION_TYPE='CONTACT_BRAND' ) Cont_Brand ) as ContactBrands, ( select array_to_json(array_agg(row_to_json(Cont_Type))) from ( select distinct RB.Contact_CLASS_NAME as typeName, RB.Contact_CLASS_CODE as typeCode, case when CONTTYPE.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CONTACT_CLASSIFICATION AS CONTTYPE JOIN mdm_smds.Ref_Contact_Class_Type AS RB ON CONTTYPE.CLASSIFICATION_TYPE = RB.Contact_CLASS_TYPE where CONTTYPE.CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE and CONTTYPE.CLASSIFICATION_TYPE='CONTACT_TYPE' ) Cont_Type ) as ContactType, ( select array_to_json(array_agg(row_to_json(contDocPref))) from ( select DOCPREF.CUSTOMER_CMD_CODE as customerCode,DOCPREF.BRAND_CODE as brandCode,DOCPREF.DOCUMENT_TYPE as documentType, (select array_to_json(array_agg(row_to_json(conCommPref))) from ( select COMPREF.COMMUNICATION_PREF_TYPE as preferenceType, COMPREF.OTH_COMM_DTL as preferenceValue, case when COMPREF.COMM_PREF_DOC_TYP_STAT = 'ACTIVE' THEN true else false end as deleteFlag FROM mdm_smds.CONTACT_DOC_TYPE_AND_COMM_PREF COMPREF where COMPREF.customer_cmd_code = DOCPREF.CUSTOMER_CMD_CODE and DOCPREF.BRAND_CODE = COMPREF.BRAND_CODE and DOCPREF.DOCUMENT_TYPE = COMPREF.DOCUMENT_TYPE ) conCommPref) as CommunicationPreferences from mdm_smds.CONTACT_DOC_TYPE_AND_COMM_PREF DOCPREF where DOCPREF.CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE group by DOCPREF.CUSTOMER_CMD_CODE,DOCPREF.BRAND_CODE,DOCPREF.DOCUMENT_TYPE ) as contDocPref ) as DocumentPreferences, ( select array_to_json(array_agg(row_to_json(Cont_Rel))) from ( select distinct CONTREL.CUSTOMER_CMD_CODE as customerCode, CONTREL.IS_MASTER_CONTACT as isMasterContact, case when CONTREL.IS_DELETED = 'Y' THEN true else false end as deleteFlag FROM mdm_smds.CONTACT_CUSTOMER_RELATIONSHIP AS CONTREL where CONTREL.CONTACT_CMD_CODE=CONT.CONTACT_CMD_CODE and CONTREL.REL_TYPE_CODE='ON_BEHALF_OF' ) Cont_Rel ) as OnBehalfOfRelationships,to_json((SELECT d FROM (SELECT CONT.CREATE_USER as creationUser, CONT.CREATE_TIME as creationDate, CONT.UPDATE_USER as lastUpdateUser, CONT.UPDATE_TIME as lastUpdateDate, CONT.SOURCE_OF_LAST_UPDATE as lastUpdateSourceSystem) d)) AS contactAuditData from mdm_smds.CONTACT_INFORMATION CONT where CONT.CONTACT_CMD_CODE ='" + cont_code + "') as cont )as Cont_Details ) as CustomerContact)" #='" + cont_code + "' #print(query) df=spark.read.format("jdbc").option("params", cont_code).option("driver","org.postgresql.Driver").option("url","jdbc:postgresql://cmd-postgres-dev01.postgres.database.azure.com:5432/cmd_pg_dev01?user=admin_pg_dev01@cmd-postgres-dev01&password=Z1sh@Rha!&sslmode=require").option("query", query).load() df.createOrReplaceTempView("CUST_JSON") print("query execution completed") print(datetime.datetime.now()) return df #get_df("DK00069117") #Get the generated JSON from command1 (from dataframe), and connect to Postgress to check in the channel for any event. If any event present, connect to elastisearch and dump the JSON document def upsert(index_name,primary_key,es):

present = es.exists(index= index_name, id= primary_key) #response = es.search(index = index_name, body = search_param) df = get_df(primary_key) df_json = df.toJSON() list = df_json.collect() line = json.loads(list[0]) body = json.loads(line['contact_root']) #print(body)

if(not present): es.index( index = index_name, document = body, id = primary_key ) else: print("update started") print(datetime.datetime.now()) es.update( index = index_name, id = primary_key, doc = body ) print("update successful") print(datetime.datetime.now())

declare connection to database

connection = psycopg2.connect(dbname="cmd_pg_dev01", user="admin_pg_dev01@cmd-postgres-dev01", host="cmd-postgres-dev01.postgres.database.azure.com", port="5432", password="Z1sh@Rha!") print("PG connection Successfull")

Elastic Search Connection

es = Elasticsearch( "https://20.4.1.212:9200", ca_certs='dbfs:/FileStore/shared_uploads/harshit.c@maersk.com/http_ca.crt', basic_auth=('elastic','xBijp7ROOC1A4xRVXVvA'), #use_ssl=True, verify_certs=False, ssl_show_warn=False ) #listen to channel #set to autocommit connection.set_isolation_level(0) #0 = ISOLATION_LEVEL_AUTOCOMMIT cur = connection.cursor() cur.execute("LISTEN cont_upserts;") print("listen time") print(datetime.datetime.now()) while True: print("here 1") select.select([connection],[],[]) #sleep until there is some data print(connection.poll() ) #get the message print(connection.notifies) print("here 2") while connection.notifies: notification = connection.notifies.pop() #pop notification from list #now do anything needed! payload = notification.payload print(f"channel: {notification.channel }") print(f"message: {notification.payload}") upsert("contactentity", payload ,es)

ElasticSearch Index & pipeline details:

  • For contactentity index ,Ingest pipeline in elastic is created to update the _id field value into contact code value. - Pipeline Creation: we can create the pipeline in Kibana dev tool like: PUT _ingest/pipeline/idToContactCode
{
"processors": [
{
"set": {
"description": "Set _id field to contact_code value",
"field": "_id",
"value": "{{{_source.contactentity.customercontact.contactcode}}}"
}
}
]
}

- Pipeline name is idToContactCodeand we can run the pipeline in Kibana dev tool like: POST _reindex?slices=20&refresh

{
"source": {
"index": "contactentity_stg"
},
"dest": {
"index": "contactentity",
"pipeline": "idToContactCode"
}
}

source will contain the stage index and dest will be having the destination index with the pipeline name.

Trigger on FACILITY_INFORMATION table (Contact Entity)

Below is the notify_custFact_upserts() function

Query to convert CustomerFacility to JSON

Refer file JSON_conversion_queries.sql Query4

Databricks notebook code:

  1. Facility Bulk Dataload into ElasticSearch Index: Refer file ES_dataload_queries.py Prgm4
  2. Facility RealTime DataSync between cloud DB and elastic index code: #TO syncup the data from write DB (Postgress) to ElasticSearch on realtime basis #!/usr/bin/env python

-- coding: utf-8 --

from elasticsearch import Elasticsearch, helpers import psycopg2 import json import select import datetime def get_df(fact_code): print("query execution started") print(datetime.datetime.now()) query = "select row_to_json(CustFactEnty) CustFact_Root from( select row_to_json(CustFactDetail) CustomerFacilityEntity from ( select FACILITY_CMD_CODE customerFacilityCode, ( select row_to_json(Fact_Info) from ( select CUSTFC.FACILITY_NAME as customerFacilityName, case when CUSTFC.EXT_OWNED = 'Y' THEN true else false end as isExternalOwned, CUSTFC.CLASSIFICATION_CODE as classificationCode, CUSTFC.STATUS_CODE as statusCode, case when CUSTFC.EXT_EXPOSED = 'Y' THEN true else false end as isExternalExposed, CUSTFC.DODAAC as DoDAAC, CUSTFC.url, case when CUSTFC.IS_PENDING = 'Y' THEN true else false end as isWorkflowPending, case when CUSTFC.IS_DELETED = 'Y' THEN true else false end as isDeletedFlag from mdm_smds.FACILITY_INFORMATION CUSTFC where CUSTFC.FACILITY_CMD_CODE=CUSTFACT.FACILITY_CMD_CODE ) Fact_Info )as FacilityInformation, ( select array_to_json(array_agg(row_to_json(Ext_Identifiers))) from ( select SOURCE_SYSTEM_NAME AS externalSystemName,EXTERNAL_SYSTEM_IDENTIFIER as externalSystemReference, case when EXT_ID.IS_DELETED = 'Y' THEN true else false end as isDeletedFlag FROM mdm_smds.FACILITY_EXTERNAL_IDENTIFIERS EXT_ID JOIN mdm_smds.ref_alternate_codes ALT_CODE ON EXT_ID.EXTERNAL_SOURCE_SYSTEM = ALT_CODE.CODE where FACILITY_CMD_CODE=CUSTFACT.FACILITY_CMD_CODE order by EXTERNAL_SOURCE_SYSTEM asc ) Ext_Identifiers ) as CustomerFacilityExtIdentifiers, ( select row_to_json(Fact_Address) from ( select FCTADD.POBOX as poBoxNumber, FCTADD.STREET_NO as streetNumber, FCTADD.ADDRESS_LINE1 as streetName, FCTADD.ADDRESS_LINE2 as apartmentOrFloor, FCTADD.ADDRESS_LINE3 as subAreaName, FCTADD.DISTRICT as district, to_json((SELECT d FROM (SELECT FCTADD.CITY_NAME as cityName, FCTADD.CITY_CODE as cityCode, FCTADD.CITY_ID as cityId) d)) AS city, to_json((SELECT d FROM (SELECT FCTADD.STATE_NAME as regionName, FCTADD.STATE_CODE as regionCode, FCTADD.STATE_ID as regionId) d)) AS region, FCTADD.POSTAL_CODE as postalCode, to_json((SELECT d FROM (SELECT FCTADD.COUNTRY_CODE as isoCountryCode,FCTADD.COUNTRY_NAME as countryName) d)) AS country, FCTADD.LATITUDE as latitude,FCTADD.LONGITUDE as longitude, FCTADD.ADDRESS_ACCURACY_LEVEL as addressAccuracyLevel, FCTADD.ADDRESS_QUALITY_CONFIRMED_BY as addressQualityConfirmedBy from mdm_smds.FACILITY_ADDRESS FCTADD where FCTADD.FACILITY_CMD_CODE = CUSTFACT.FACILITY_CMD_CODE) Fact_Address ) as CustomerFacilityAddress, ( select array_to_json(array_agg(row_to_json(Cust_Fact_Rel))) from ( select REL.CUSTOMER_CMD_CODE as customerCode, REL.FACILITY_CMD_CODE as customerFacilityCode, REL.REL_TYPE_CODE as relationshipType, rel_type.REL_TYPE_NAME as relationshipTypeName, REL.VALID_FROM_DATE as validFromDate, REL.VALID_THRU_DATE as validThroughDate, case when REL.IS_DELETED = 'Y' THEN true else false end as isDeletedFlag FROM mdm_smds.FACILITY_CUSTOMER_REL REL join mdm_smds.Ref_Relationship_Types rel_type on rel.rel_type_code = rel_type.rel_type_code where REL.FACILITY_CMD_CODE=CUSTFACT.FACILITY_CMD_CODE order by REL.REL_TYPE_CODE asc ) Cust_Fact_Rel ) as CustomerFacilityRelationships,to_json((SELECT d FROM (SELECT CUSTFACT.CREATE_USER as creationUser, CUSTFACT.CREATE_TIME as creationDate, CUSTFACT.UPDATE_USER as lastUpdateUser, CUSTFACT.UPDATE_TIME as lastUpdateDate, CUSTFACT.SOURCE_OF_LAST_UPDATE as lastUpdateSourceSystem) d)) AS CustomerFacilityAuditData from mdm_smds.FACILITY_INFORMATION CUSTFACT where CUSTFACT.FACILITY_CMD_CODE ='" + fact_code + "' ) as CustFactDetail ) as CustFactEnty "

#print(query) df=spark.read.format("jdbc").option("params", fact_code).option("driver","org.postgresql.Driver").option("url","jdbc:postgresql://cmd-postgres-dev01.postgres.database.azure.com:5432/cmd_pg_dev01?user=admin_pg_dev01@cmd-postgres-dev01&password=Z1sh@Rha!&sslmode=require").option("query", query).load() df.createOrReplaceTempView("FACT_JSON") print("query execution completed") print(datetime.datetime.now()) return df #get_df("FR00185989") #Get the generated JSON from command1 (from dataframe), and connect to Postgress to check in the channel for any event. If any event present, connect to elastisearch and dump the JSON document def upsert(index_name,primary_key,es):

present = es.exists(index= index_name, id= primary_key) #response = es.search(index = index_name, body = search_param) df = get_df(primary_key) df_json = df.toJSON() list = df_json.collect() line = json.loads(list[0]) body = json.loads(line['custfact_root']) #print(body)

if(not present): es.index( index = index_name, document = body, id = primary_key ) else: print("update started") print(datetime.datetime.now()) es.update( index = index_name, id = primary_key, doc = body ) print("update successful") print(datetime.datetime.now())

declare connection to database

connection = psycopg2.connect(dbname="cmd_pg_dev01", user="admin_pg_dev01@cmd-postgres-dev01", host="cmd-postgres-dev01.postgres.database.azure.com", port="5432", password="Z1sh@Rha!") print("PG connection Successfull")

Elastic Search Connection

es = Elasticsearch( "https://20.4.1.212:9200", ca_certs='dbfs:/FileStore/shared_uploads/harshit.c@maersk.com/http_ca.crt', basic_auth=('elastic','xBijp7ROOC1A4xRVXVvA'), #use_ssl=True, verify_certs=False, ssl_show_warn=False ) #listen to channel #set to autocommit connection.set_isolation_level(0) #0 = ISOLATION_LEVEL_AUTOCOMMIT cur = connection.cursor() #new_cust_upsert cur.execute("LISTEN cust_fct_upserts;") print("listen time") print(datetime.datetime.now()) while True: print("here 1") select.select([connection],[],[]) #sleep until there is some data print(connection.poll() ) #get the message print(connection.notifies) print("here 2") while connection.notifies: notification = connection.notifies.pop() #pop notification from list #now do anything needed! payload = notification.payload print(f"channel: {notification.channel }") print(f"message: {notification.payload}") upsert("facilityentity", payload ,es)

ElasticSearch Index & pipeline details:

  • For facilityentity index ,Ingest pipeline in elastic is created to update the _id field value into customer facility code value. - Pipeline Creation: we can create the pipeline in Kibana dev tool like: PUT _ingest/pipeline/idToFactCode
{
"processors": [
{
"set": {
"description": "Set _id field to contact_code value",
"field": "_id",
"value": "{{{_source.customerfacilityentity.customerfacilitycode}}}"
}
}
]
}

- Pipeline name is idToFactCodeand we can run the pipeline in Kibana dev tool like: POST _reindex?slices=20&refresh

{
"source": {
"index": "facilityentity_stg"
},
"dest": {
"index": "facilityentity",
"pipeline": "idToFactCode"
}
}

source will contain the stage index and dest will be having the destination index with the pipeline name.

M&A Data Migration/Integration

SQL Queries for IDL from OnPrem to Cloud

Customer Queries:

Contact Queries:

Customer Facility Queries:

Data model and Data migration to Higher Environment

Below are high level steps to be followed to migrate the below components.

  1. Core tables (Customer, Contact, Concern, Customer Facility) - 24 tables
  2. Reference tables (29 tables)
  3. Database sequences

Step1: Execute REF_Tables&InsertScripts.sql file to create schemas, reference tables and to insert data excpet for 3 tables (Ref_Collection_Business_Units, GENERATED_BE_CODE, DAMCO_CMD_DUP_CUST_MAP). Data for these tables will be loaded by data pipeline.

Step2: Execute CMD_DDLScripts_CustomerTables.sql file to create all customer tables

Step3: Execute CMD_DDLScripts_ContactTables.sql to create all contact tables

Step4: Execute CMD_DDLScripts_CustomerFacility.sql to create all customer facility tables

Step5: Execute CMD_DDLScripts_Sequences.sql to create all sequences

Step6: Run the ADF pipelines to load the data in below order

  1. Ref_Collection_Business_Units
  2. GENERATED_BE_CODE
  3. DAMCO_CMD_DUP_CUST_MAP
  4. Customer Data
  5. Concern Data
  6. Contact Data
  7. Customer Facility Data
Was this page helpful?