Customer Status Overview
The Customer Status Overview Report is designed to identify inconsistencies or anomalies in customer records across multiple systems, ensuring data integrity and synchronization. This report helps monitor discrepancies between different sources of customer information and flags records that do not align, enabling proactive resolution and better data management across the entire customer lifecycle.
Data Sources:
- NFTP: abfss://nftpapp@dlsazewpdatalakelob.dfs.core.windows.net
- CW1: abfss://tmff@dlsazewpdatalakecleansed.dfs.core.windows.net
- CMD: /dlsazewpdatalakelob/cmd/Replica/
- SAP MDG: storage_account_name='syx1t0b100sgtilmpwezagts' , container_name="nftp-bods-prod"
- S4 MDG: storage_account_name='syx1t0b100sgtilmpwezagts' ,container_name="nftp-bods-prod"
- DMDM,SCP MDM – Data is present in blob.
- TMFF: You need to create the team folder and share the summer to send the data
- Raise a request using oto-mate for maestro path
- Customer Status Overview: Identifies inconsistencies or anomalies in customer records across systems like DMDM, SCP MDM, S4 MDG, S4 HANA, SAP MDG, and CargoWise.
- CW1 Summary: Highlights customer status discrepancies between CMD and CW1 by region and status, also showing the overall percentage of matching and mismatched records.
- CW1 Volume: Provides transactional data for the last 12 months from TMFF and CW1, representing LCL and FCL volumes in CBL, and air volumes in kilograms (KG).
- Mismatch Snapshot: A daily snapshot tracking progress in resolving customer data exceptions, monitoring outstanding issues, and measuring improvements over time.
Detailed Data Source and Filter Conditions:
| System | Metric | Status | Filters | Lookup | Point of Contact | Data Refresh Scheduled |
|---|---|---|---|---|---|---|
| TMFF | Total Distinct Customer Codes | Active when isblock = 0 and isactive = 1; otherwise Restricted | Filters global records (ownerid="*") | Fact code | Summer Xia | Daily at 12:30 PM IST |
| SCP MDM | Total Distinct Fact Codes Count | Active, Inactive | N/A | Fact code | Mahendra Parihar | Daily at 12:30 PM IST |
| SAP MDG | Total Distinct Fact Codes (KUNNR) Count | IF AUFSD = '01' THEN "Inactive", else "Active" | N/A | KUNNR | Khushbu Pandey | Daily at 12:30 PM IST |
| S4 MDG | Total Distinct Fact Codes Count | Active, Inactive, Suspended | N/A | Fact code | Khushbu Pandey | Daily at 12:30 PM IST |
| S4 HANA | Total Distinct Fact Codes Count | Active, Inactive, Suspended | N/A | Fact code | Phani Raj Kallur | Daily at 12:30 PM IST |
| DMDM | Total Distinct Fact Codes Count | Active, Inactive | N/A | Fact code | Mahendra Parihar | Daily at 12:30 PM IST |
| CMD | Total Distinct Customer Codes | Active, Inactive, Suspended | N/A | Customer Code | Gopala Krishnan Rajendran | Daily at 12:30 PM IST |
| Cargo Wise | KFF - Kewill | N/A | N/A | N/A | Bobby Wan/Santosh Birje | Daily at 12:30 PM IST |
| Cargo Wise | Pdel - Place of Delivery | N/A | N/A | N/A | Bobby Wan/Santosh Birje | Daily at 12:30 PM IST |
| Cargo Wise | Por - Place of Receipt | N/A | N/A | N/A | Bobby Wan/Santosh Birje | Daily at 12:30 PM IST |
| Cargo Wise | Total Distinct OKCustomsRegNo Count | Active: OHIsActive = 'true', Inactive: OHIsActive = 'false', Suspended: OHIsConsignee = 'false' OR OHIsConsignor = 'false' | okcodetype in ('GCR') and deleted = 'false' | Fact code | Bobby Wan/Santosh Birje | Daily at 12:30 PM IST |
| Cargo Wise | Volume | N/A | Considered last 6 months shipments | N/A | Bobby Wan/Santosh Birje | Daily at 12:30 PM IST |
| Cargo Wise | Volume unit - KGs, CBM | N/A | N/A | N/A | Bobby Wan/Santosh Birje | Daily at 12:30 PM I |
--CW1 Query
-- Maestro Path- dlsazewpdatalakecleansed.dfs.core.windows.net
-- Container -tmff
select
B.OKCustomsRegNo as FACT_CODE,
a.OHCode as orgaCode,
OHIsActive,
OHIsConsignee,
OHIsConsignor,
CASE
WHEN OHIsActive = 'true' AND (OHIsConsignee = 'true' OR OHIsConsignor = 'true') THEN 'Active'
WHEN OHIsActive = 'true' AND (OHIsConsignee = 'false' AND OHIsConsignor = 'false') THEN 'Suspended'
ELSE 'Inactive'
END CW1_STATUS
from delta./mnt/tmff_dlsazewpdatalakecleansed/OrgCusCode b
left join DELTA./mnt/tmff_dlsazewpdatalakecleansed/OrgHeader a on A.OHPK = B.OKOh and a.deleted='false'
where
okcodetype in ('GCR') and b.deleted='false' and OKRnNKCodeCountry='XX'
--DMDM and TMFF Dataset blobAccount='' #test env blobKey='' #test env Blob_ConnStr="fs.azure.account.key."+blobAccount+".blob.core.windows.net" spark.conf.set(Blob_ConnStr,blobKey) Blob_container="mdm" Blob_schema_file="dmdm/CutomerData.csv" Blob_url="wasbs://"+Blob_container+'@'+blobAccount+".blob.core.windows.net/" df = spark.read.option("header","true").csv(Blob_url+Blob_schema_file) df.createOrReplaceTempView("PKG_CUST_MINIMAL_SRCH")
Blob_schema_file="dmdm/EXTENDED_BECODE.csv" Blob_url="wasbs://"+Blob_container+'@'+blobAccount+".blob.core.windows.net/" df = spark.read.option("header","true").csv(Blob_url+Blob_schema_file) df.createOrReplaceTempView("Extended_Customers")
Blob_container="tmff" Blob_url="wasbs://"+Blob_container+'@'+blobAccount+".blob.core.windows.net/" df = spark.read.option("header","true").csv(Blob_url) dffilter=df.filter(df["ownerid"] == '*').distinct() dffilter.createOrReplaceTempView("tmff")
--SCP MDM DataSet blobAccount='' #test env blobKey='' #test env Blob_ConnStr="fs.azure.account.key."+blobAccount+".blob.core.windows.net" spark.conf.set(Blob_ConnStr,blobKey) Blob_container="mdm"
Blob_schema_file="scpmdmfull/" Blob_url="wasbs://"+Blob_container+'@'+blobAccount+".blob.core.windows.net/" dfscpmdmfull = spark.read.option("header","true").parquet(Blob_url+Blob_schema_file) dfscpmdmfullfilter = dfscpmdmfull.select("AlternativeCode", "Status").filter(dfscpmdmfull["AlternativeCodeType"] == 'FACT').distinct() #delta load Blob_schema_file="scpmdmdelta/" Blob_url="wasbs://"+Blob_container+'@'+blobAccount+".blob.core.windows.net/" scpdelta = spark.read.option("header","true").parquet(Blob_url+Blob_schema_file) dfdeltafilter = scpdelta.select("AlternativeCode", "Status").filter(scpdelta["AlternativeCodeType"] == 'FACT').distinct()
Find the records in dfscpmdmfullfilter that do not exist in dfdeltafilter
df_difference = dfscpmdmfullfilter.join( dfdeltafilter, on=["AlternativeCode"], how="left_anti" )
Merge df_difference with dfdeltafilter
df_merged = df_difference.union(dfdeltafilter) df_merged.createOrReplaceTempView("scpmdm")
--S4 MDG from datetime import datetime from datetime import timedelta from pyspark.sql.utils import AnalysisException strdate=datetime.today().strftime('%Y%m%d') str2date =(datetime.now()-timedelta(days=1)).strftime("%Y%m%d") storage_account_name='syx1t0b100sgtilmpwezagts' #test env container_name="nftp-bods-prod" sas_token=""
#data_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/kpa293/KNA1/FF_KNA1_DUMP_"+strdate+".txt" data_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/kpa293/KNA1/FF_KNA1_DUMP_20241001.txt" spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",sas_token) try: print(data_path) df = spark.read.option("header","true").option("delimiter","|").csv(data_path) except AnalysisException as e: try: data_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/kpa293/KNA1/FF_KNA1_DUMP_"+str2date+".txt" print(data_path) df = spark.read.option("header","true").option("delimiter","|").csv(data_path) except AnalysisException as e: df = None mdgdf=df.select("SORTL","KATR1").distinct() mdgdf.createOrReplaceTempView("mdgds")
--DMDM -MDG from datetime import datetime strdate=datetime.today().strftime('%Y%m%d') storage_account_name='syx1t0b100sgtilmpwezagts' #test env container_name="nftp-bods-prod" sas_token="" #data_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/kpa293/KNA1_SAP_MDG/KNA1_DUMP_MP1_"+strdate+".txt" data_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/kpa293/KNA1_SAP_MDG/KNA1_DUMP_MP1_20241001.txt" print(data_path) spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net",sas_token) try: df = spark.read.option("header","true").option("delimiter","|").csv(data_path) except AnalysisException as e: try: data_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/kpa293/KNA1_SAP_MDG/KNA1_DUMP_MP1_"+str2date+".txt" print(data_path) df = spark.read.option("header","true").option("delimiter","|").csv(data_path) except AnalysisException as e: df = None dmdmdgdf=df.select("SORTL","KUNNR","AUFSD").distinct() dmdmdgdf.createOrReplaceTempView("dmdmdgdf")
--Customer Status Dashboard Query
--CMD DataLake- dlsazewpdatalakelob/cmd/Replica/
%sql
select
distinct A.customer_cmd_code as CMD_CODE,
A.trading_name as CUSTOMER_NAME,
A.BE_CODE,
concat(
"https://cmd.maersk.com/customer/",
A.customer_cmd_code
) cmd_url,
case when A.customer_status_cd = 'A' then 'Active' when A.customer_status_cd = 'I' then 'Inactive' when A.customer_status_cd = 'S' then 'Suspended' end CMD_STATUS,
A.STATUS_REASON,
COALESCE(cust.FACT_CODE,ext.FACT_CODE) DMDM_CODE,
COALESCE(cust.SCV_CODE,ext.SCV_CODE) DAMCOSCVCODE,
COALESCE(cust.STAT_NM,ext.STAT_NM) DMDM_STATUS,
COALESCE(cust.MODS_BE_CODE, ext.MODS_BE_CODE) MODS_BE_CODE,
B.FACT_CODE as CW1_CODE,
B.orgaCode,
B.CW1_STATUS,
B.OHIsActive,
B.OHIsConsignee,
B.OHIsConsignor,
sap.SAP_code as S4HANA_CODE,
case when sap.SAP_CODE_Status = 'A' then 'Active' when sap.SAP_CODE_Status = 'I' then 'Inactive' when sap.SAP_CODE_Status = 'S' then 'Suspended' when sap.SAP_CODE_Status = '' then 'Not exist' end S4HANA_STATUS,
case when mdg.KATR1 = 'A' then 'Active' when mdg.KATR1 = 'I' then 'Inactive' when mdg.KATR1 = 'S' then 'Suspended' else '' end MDG_Status,
mdg.SORTL MDG_CODE,
A.customer_brand_cd,
A.COUNTRY_Name as COUNTRY,
A.state_code,
A.state_name,
CURRENT_TIMESTAMP() as LastRefreshed
from
(
select
distinct A1.customer_cmd_code,
a1.customer_status_cd,
a1.trading_name,
status_reason,
a1.account_group_type,
BE_CODE,
customer_brand_cd,
country_name,
A1.state_code,
A1.state_name
from
(
select
DISTINCT a.customer_cmd_code,
a.trading_name,
a.customer_status_cd,
sr.status_reason,
account_group_type,
b.external_system_identifier BE_CODE,
c.customer_brand_cd,
concat(
a.country_code, b.external_system_identifier
) COUNTRY_BE_CODE,
a.country_name,
a.country_code,
a.city,
a.city_code,
a.postal_code,
a.state_code,
a.state_name
from
parquet.dbfs:/mnt/lob_cmd/Replica/CUSTOMER_INFORMATION/ a
left outer join (
SELECT
CUSTOMER_CMD_CODE,
array_join(
collect_list(DISTINCT REASON_NAME),
','
) STATUS_REASON
FROM
parquet.dbfs:/mnt/lob_cmd/Replica/CUSTOMER_STATUS_REASONS/
WHERE
date_part = current_date
AND IS_DELETED = 'N'
GROUP BY
CUSTOMER_CMD_CODE
) sr on a.CUSTOMER_CMD_CODE = sr.CUSTOMER_CMD_CODE
left outer join parquet.dbfs:/mnt/lob_cmd/Replica/CUSTOMER_EXTERNAL_IDENTIFIERS/ b on a.customer_cmd_code = b.customer_cmd_code
and b.date_part = current_date
and b.IS_DELETED = 'N'
AND b.external_source_system = 'PTR_ALT_CODES.CSTBE'
left outer join parquet.dbfs:/mnt/lob_cmd/Replica/CUSTOMER_BRANDS/ c on a.customer_cmd_code = c.customer_cmd_code
and c.date_part = current_date
and c.IS_DELETED = 'N'
where
a.date_part = current_date
) A1
) A
LEFT JOIN (
select
B.OKCustomsRegNo as FACT_CODE,
a.OHCode as orgaCode,
OHIsActive,
OHIsConsignee,
OHIsConsignor,
CASE
WHEN OHIsActive = 'true' AND (OHIsConsignee = 'true' OR OHIsConsignor = 'true') THEN 'Active'
WHEN OHIsActive = 'true' AND (OHIsConsignee = 'false' AND OHIsConsignor = 'false') THEN 'Suspended'
ELSE 'Inactive'
END CW1_STATUS
from delta./mnt/tmff_dlsazewpdatalakecleansed/OrgCusCode b
left join DELTA./mnt/tmff_dlsazewpdatalakecleansed/OrgHeader a on A.OHPK = B.OKOh and a.deleted='false'
where
okcodetype in ('GCR') and b.deleted='false' and OKRnNKCodeCountry='XX'
) B ON A.customer_cmd_code = B.FACT_CODE
LEFT JOIN mdgds mdg ON A.customer_cmd_code = mdg.SORTL
LEFT JOIN PKG_CUST_MINIMAL_SRCH cust on cust.FACT_CODE=A.customer_cmd_code
LEFT JOIN (select
cust.STATUS
,cust.STAT_NM
,cust.FACT_CODE
,cust.SCV_CODE
,cust.MODS_BE_CODE
,ext.CMDFACTCODE
from PKG_CUST_MINIMAL_SRCH cust
INNER JOIN Extended_Customers ext ON cust.FACT_CODE=ext.FACTCODE) ext ON A.customer_cmd_code=ext.CMDFACTCODE
LEFT JOIN (
SELECT distinct
A.BP_ID_NUM as SAP_code,
B.ZFCATTR1 as SAP_CODE_Status
from
DELTA./mnt/nftpapplob/ZFO2AO017/ A
INNER JOIN DELTA./mnt/nftpapplob/ZFO2AO023/ B ON A.BPARTNER = B.BPARTNER
where BP_ID_TYPE='ZCMD00' and FLGDELETED is null
) sap on A.customer_cmd_code = sap.SAP_code
--Volume Report
%sql
select year(activity_date) As Year,date_format(activity_date, 'MMMM') AS Month_Name,subsystem_cd,productfamily_cd, fact_code,shipper,Consignee,Por,Pdel,count(nvl(hbl,0)) Shipment,sum(nvl(LCL_CBM,0)) Volume from delta.dbfs:/mnt/ffwocedev/tW_FFW_Summary_AIRLCL_Cust_Vols_LCL where activity_date >= date_add(current_date(), -365) group by subsystem_cd,productfamily_cd,fact_code,shipper,Consignee,por,pdel,date_format(activity_date, 'MMMM'),year(activity_date)