SCRIPT_ForceSales

Technical Description of SCRIPT_ForceSales within Prod OMNIA Partners EDW at 25-Mar-2025 12:24:48

Purpose:

SCRIPT_ForceSales

Date Created:

2021-10-21 13:11:51

Date Last Updated:

Script Type:

PowerShell (64-bit) script

Notes:

#==============================================================================
# DBMS Name        :    SNOWFLAKE Custom*
# Template         :    wsl_snowflake_pscript_load
# Template Version :    8.5.1.0
# Description      :    Load ForceSales
# Generated by     :    WhereScape RED Version 9.0.2.1 (build 240801-225410)
# Generated for    :    Omnia Partners
# Generated on     :    Monday, February 03, 2025 at 10:50:17
# Author           :    Amrit Nandrey
# Source Schema    :    dbo
# Source Table     :    vwPrsDetail
#==============================================================================
# Notes / History
#
Import-module -Name WslPowershellCommon -DisableNameChecking
Hide-Window



function Print-Log {
    $logStream.Dispose()
    $logReader = New-Object IO.StreamReader($fileAud)

    while( ! $logReader.EndOfStream) {
        [Console]::WriteLine($logReader.ReadLine())
    }

    $logReader.Dispose()
}

Function Replace-WslTags($stuff) {

    if([string]::IsNullOrWhitespace($stuff)) {
        return $stuff
    }

    if($stuff.Contains('$SEQUENCE$')) {
        $stuff = $stuff.Replace('$SEQUENCE$',${env:WSL_SEQUENCE})
    }

    if([regex]::IsMatch($stuff,'\$.+\$')) {
        # If $stuff contains two or more $s and the $SEQUENCE$ string is not detected
        # or has already been replaced then we assume a date
        while([regex]::IsMatch($stuff,'\$.+\$')) {
            $startPos = $stuff.IndexOf('$')

            $work = $stuff.SubString($startPos + 1)
            $endPos = $work.IndexOf('$')

            $suppliedFormat = $work.SubString(0, $endPos)
            $dateFormat = $suppliedFormat.Replace('YY','yy').Replace('DD','dd').Replace('HH','hh').Replace('MI','mm').Replace('SS','ss')
            $dateString = (Get-Date -f $dateFormat)
            $replaceString = '$' + $suppliedFormat + '$'

            $stuff = $stuff.Replace($replaceString,$dateString)
        }
    }

    if($stuff.indexOf('$') -ne -1) {
        ${env:warn} = $true
        $logStream.WriteLine("Unclosed '$' tag in '$stuff'")
        $logStream.WriteLine("Unclosed '$' will be removed")
        $stuff = $stuff.Replace('$','')
    }
    return $stuff.Trim()
}

Function Gzip-File {
    param(
        [string]$inFile = $(throw "No input file specified"),
        [string]$outFile = $inFile + ".gz",
        [switch]$removeOriginal = $false
    )

    $input = New-Object IO.FileStream $InFile, ([IO.FileMode]::Open), ([IO.FileAccess]::Read), ([IO.FileShare]::Read)
    $output = New-Object IO.FileStream $OutFile, ([IO.FileMode]::Create), ([IO.FileAccess]::Write), ([IO.FileShare]::None)
    $gzipStream = New-Object IO.Compression.GzipStream $Output, ([IO.Compression.CompressionLevel]::Fastest)

    $input.CopyTo($gzipStream)

    $gzipStream.Dispose()
    $output.Dispose()
    $input.Dispose()

    if($removeOriginal) {
        Remove-Item -Path $inFile
    }

    return (gci $outFile).FullName
}

function Load-Data {
    # Get extended property values
    ${env:DEBUG}              = Get-ExtendedProperty -PropertyName "SF_DEBUG_MODE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:PURGE}              = Get-ExtendedProperty -PropertyName "SF_PURGE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:ACCESS_KEY}         = Get-ExtendedProperty -PropertyName "SF_ACCESS_KEY" -TableName ${env:WSL_LOAD_TABLE}
    ${env:SECRET_KEY}         = Get-ExtendedProperty -PropertyName "SF_SECRET_KEY" -TableName ${env:WSL_LOAD_TABLE}
    ${env:SEND_FILES_ZIPPED}  = Get-ExtendedProperty -PropertyName "SF_SEND_FILES_ZIPPED" -TableName ${env:WSL_LOAD_TABLE}
    ${env:FILE_FORMAT}        = Get-ExtendedProperty -PropertyName "SF_FILE_FORMAT" -TableName ${env:WSL_LOAD_TABLE}
    ${env:SNOWSQL_ACCOUNT}    = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_ACCOUNT" -TableName ${env:WSL_LOAD_TABLE}
    ${env:SNOWSQL_DATABASE}   = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_DATABASE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:SNOWSQL_SCHEMA}     = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_SCHEMA" -TableName ${env:WSL_LOAD_TABLE}
    ${env:SNOWSQL_WAREHOUSE}  = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_WAREHOUSE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:UNICODE_SUPPORT}    = Get-ExtendedProperty -PropertyName "SF_UNICODE_SUPPORT" -TableName ${env:WSL_LOAD_TABLE}
    ${env:UNLOAD_DELIM}       = Get-ExtendedProperty -PropertyName "SF_UNLOAD_DELIMITER" -TableName ${env:WSL_LOAD_TABLE}
    ${env:UNLOAD_ENC}         = Get-ExtendedProperty -PropertyName "SF_UNLOAD_ENCLOSED_BY" -TableName ${env:WSL_LOAD_TABLE}
    ${env:UNLOAD_ESC}         = Get-ExtendedProperty -PropertyName "SF_UNLOAD_ESCAPE_CHAR" -TableName ${env:WSL_LOAD_TABLE}
    ${env:SPLIT_THRESHOLD}    = Get-ExtendedProperty -PropertyName "SF_SPLIT_THRESHOLD" -TableName ${env:WSL_LOAD_TABLE}
    ${env:FILE_COUNT}         = Get-ExtendedProperty -PropertyName "SF_SPLIT_COUNT" -TableName ${env:WSL_LOAD_TABLE}
    ${env:TIMEZONE}           = Get-ExtendedProperty -PropertyName "SF_TIMEZONE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:FILE_TYPE}          = Get-ExtendedProperty -PropertyName "SF_FILE_TYPE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:EXTERNAL_STAGE}     = Get-ExtendedProperty -PropertyName "SF_EXTERNAL_STAGE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:AZ_SAS_TOKEN}       = Get-ExtendedProperty -PropertyName "SF_AZURE_SAS_TOKEN" -TableName ${env:WSL_LOAD_TABLE}
    ${env:AZ_ENCRYPTION_TYPE} = Get-ExtendedProperty -PropertyName "SF_AZURE_ENCRYPTION_TYPE" -TableName ${env:WSL_LOAD_TABLE}
    ${env:AZ_ENCRYPTION_KEY}  = Get-ExtendedProperty -PropertyName "SF_AZURE_ENCRYPTION_KEY" -TableName ${env:WSL_LOAD_TABLE}
    ${env:GCS_STORAGE_INTEGRATION} = Get-ExtendedProperty -PropertyName "SF_GCS_STORAGE_INTEGRATION" -TableName ${env:WSL_LOAD_TABLE}
    ${env:GCS_STAGE_AREA_NAME} = Get-ExtendedProperty -PropertyName "SF_GCS_STAGE_AREA_NAME" -TableName ${env:WSL_LOAD_TABLE}

    if(${env:UNICODE_SUPPORT} -ne "TRUE") {
        ${env:UNICODE_SUPPORT} = "FALSE"
    }
    if(${env:SEND_FILES_ZIPPED} -ne "TRUE") {
        ${env:SEND_FILES_ZIPPED} = "FALSE"
    }
    if(${env:FILE_COUNT} -lt 1) {
        ${env:FILE_COUNT} = 1
    }
    if([string]::IsNullOrEmpty(${env:UNLOAD_DELIM})) {
        ${env:UNLOAD_DELIM} = "|"
    }
    if([string]::IsNullOrEmpty(${env:UNLOAD_ENC})) {
        ${env:UNLOAD_ENC} = '"'
    }
    if([string]::IsNullOrEmpty(${env:UNLOAD_ESC})) {
        ${env:UNLOAD_ESC} = "#"
    }
    if([string]::IsNullOrEmpty(${env:FILE_TYPE})) {
        ${env:FILE_TYPE} = "CSV"
    }

    ${env:SNOWSQL_USER} = ${env:WSL_TGT_USER}
    ${env:SNOWSQL_PWD}  = ${env:WSL_TGT_PWD}

    if ( ![string]::IsNullOrWhiteSpace(${env:EXTERNAL_STAGE})){
     $PURGE = "FALSE"
		 if ( ![string]::IsNullOrWhiteSpace(${env:PURGE})){
        $PURGE = ${env:PURGE}
	  }
    }
    else{
     $PURGE = "TRUE"
		 if ( ![string]::IsNullOrWhiteSpace(${env:PURGE})){
        $PURGE = ${env:PURGE}
     }
		}
    # Work out full file format name
    Add-Type -Path $(Join-Path -Path ${env:WSL_BINDIR} -ChildPath 'WslMetadataServiceClient.dll')
    $metaDbType = [WslMetadataServiceClient.MetaDatabaseType]::SqlServer
    # Metadata DSN, Meta DB Type enum, Metadata User, Metadata Password, Metadata schema and lastly maybe 64bit
    if ([System.Environment]::Is64BitProcess) {
        $metaArch = [WslMetadataServiceClient.Architecture]::_64bit
        $repo = New-Object WslMetadataServiceClient.Repo(${env:WSL_META_DSN},$metaDbType,${env:WSL_META_USER},${env:WSL_META_PWD},"dbo.",$metaArch)
    }
    else {
        $repo = New-Object WslMetadataServiceClient.Repo(${env:WSL_META_DSN},$metaDbType,${env:WSL_META_USER},${env:WSL_META_PWD},"dbo.")
    }
    $root = $repo.objectsByName
    $fileFormatObject = $root[${env:FILE_FORMAT}]
		${env:fileFormatDatabase} = $fileFormatObject.target.database.GetValue()
    ${env:fileFormatSchema} = $fileFormatObject.target.schema.GetValue()
    ${env:fileFormatFullName} = ""
    if( ! [string]::IsNullOrEmpty(${env:fileFormatDatabase})) { ${env:fileFormatFullName} += ${env:fileFormatDatabase} + "." }
    if( ! [string]::IsNullOrEmpty(${env:fileFormatSchema})) { ${env:fileFormatFullName} += ${env:fileFormatSchema} + "." }
    ${env:fileFormatFullName} += ${env:FILE_FORMAT}

    if(${env:DEBUG} -eq "TRUE") {
        $logStream.WriteLine("=================== LOAD OPTIONS ===================")
        $logStream.WriteLine("Specified Load Table:        " + ${env:WSL_LOAD_TABLE})
        $logStream.WriteLine("Specified Work Dir:          " + ${env:WSL_WORKDIR})
        $logStream.WriteLine("Specified Sequence:          " + ${env:WSL_SEQUENCE})
        $logStream.WriteLine("Specified Metadata ODBC DSN: " + ${env:WSL_META_DSN})
        $logStream.WriteLine("Specified Metadata Username: " + ${env:WSL_META_USER})
        $logStream.WriteLine("Specified Metadata Password: " + (New-Object string ('*', ${env:WSL_META_PWD}.Length)))
        $logStream.WriteLine("")
        if($runMode -eq 'S3') {
            $logStream.WriteLine("=================== CONNECTION OPTIONS ===================")
            $logStream.WriteLine("Access Key:                  " + (New-Object string ('*', ${env:ACCESS_KEY}.Length)))
            $logStream.WriteLine("Secret Key:                  " + (New-Object string ('*', ${env:SECRET_KEY}.Length)))
            $logStream.WriteLine("")
        }
        elseif($runmode -eq "AZ") {
            $logStream.WriteLine("=================== CONNECTION OPTIONS ===================")
            $logStream.WriteLine("SAS Token:                          " + (New-Object string ('*', ${env:AZ_SAS_TOKEN}.Length)))
            $logStream.WriteLine("Encryption Method:                  " + ${env:AZ_ENCRYPTION_TYPE})
            $logStream.WriteLine("Encryption Key:                     " + (New-Object string ('*', ${env:AZ_ENCRYPTION_KEY}.Length)))
            $logStream.WriteLine("")
        }
        elseif($runmode -eq "GCS") {
            $logStream.WriteLine("=================== CONNECTION OPTIONS ===================")
            $logStream.WriteLine("GCS STORAGE INTEGRATION:                  " + ${env:GCS_STORAGE_INTEGRATION})
            $logStream.WriteLine("GCS STAGE AREA NAME:                     " + ${env:GCS_STAGE_AREA_NAME})
            $logStream.WriteLine("")
        }
        $logStream.WriteLine("=================== MODES ===================")
        $logStream.WriteLine("Specified Debug Mode:        " + ${env:DEBUG})
        $logStream.WriteLine("Specified Run Mode:          " + $runMode)
        $logStream.WriteLine("Unicode Extract:             " + ${env:UNICODE_SUPPORT})
        $logStream.WriteLine("")
        if($runMode -eq 'Database') {
            $logStream.WriteLine("=================== SOURCE TABLE INFO ===================")
            $logStream.WriteLine("Source Schema:               " + ${env:WSL_SRC_SCHEMA})
            $logStream.WriteLine("Source Tables:               " + @"
vwPrsDetail
"@)
            $logStream.WriteLine("Source Where:                " + @"

"@)
            $logStream.WriteLine("")
            $logStream.WriteLine("=================== SOURCE DB INFO ===================")
            $logStream.WriteLine("ODBC Source DSN:             " + ${env:WSL_SRC_DSN})
            $logStream.WriteLine("ODBC Source Username:        " + ${env:WSL_SRC_USER})
            $logStream.WriteLine("ODBC Source Password:        " + (New-Object string ('*', ${env:WSL_SRC_PWD}.Length)))
            $logStream.WriteLine("")
        }
        $logStream.WriteLine("=================== EXTENDED PROPERTIES ===================")
        $logStream.WriteLine("SF_SNOWSQL_ACCOUNT:       " + ${env:SNOWSQL_ACCOUNT})
        $logStream.WriteLine("SF_SNOWSQL_WAREHOUSE:     " + ${env:SNOWSQL_WAREHOUSE})
        $logStream.WriteLine("SF_SNOWSQL_DATABASE:      " + ${env:SNOWSQL_DATABASE})
        $logStream.WriteLine("SF_SNOWSQL_SCHEMA:        " + ${env:SNOWSQL_SCHEMA})
        $logStream.WriteLine("SF_FILE_FORMAT:           " + ${env:FILE_FORMAT})
        $logStream.WriteLine("fileFormatFullName:       " + ${env:fileFormatFullName})
        $logStream.WriteLine("SF_SEND_FILES_ZIPPED:     " + ${env:SEND_FILES_ZIPPED})
        $logStream.WriteLine("SF_UNLOAD_DELIMITER:      " + ${env:UNLOAD_DELIM})
        $logStream.WriteLine("SF_UNLOAD_ENCLOSED_BY:    " + ${env:UNLOAD_ENC})
        $logStream.WriteLine("SF_UNLOAD_ESCAPE_CHAR:    " + ${env:UNLOAD_ESC})
        $logStream.WriteLine("SF_SPLIT_THRESHOLD:       " + ${env:SPLIT_THRESHOLD})
        $logStream.WriteLine("SF_SPLIT_COUNT:           " + ${env:FILE_COUNT})
        $logStream.WriteLine("SF_TIMEZONE:              " + ${env:TIMEZONE})
        $logStream.WriteLine("SF_FILE_TYPE:             " + ${env:FILE_TYPE})
        $logStream.WriteLine("SF_EXTERNAL_STAGE:        " + ${env:EXTERNAL_STAGE})
        $logStream.WriteLine("SF_AZURE_SAS_TOKEN:       " + ${env:AZ_SAS_TOKEN})
        $logStream.WriteLine("SF_AZURE_ENCRYPTION_TYPE: " + ${env:AZ_ENCRYPTION_TYPE})
        $logStream.WriteLine("SF_AZURE_ENCRYPTION_KEY:  " + ${env:AZ_ENCRYPTION_KEY})
				$logStream.WriteLine("SF_PURGE:  " + ${env:PURGE})
        $logStream.WriteLine("")
    }
    if (![string]::IsNullOrWhiteSpace($env:WSL_META_CONSTRING)) {
        $logStream.WriteLine("         META DATA CONNECTED USING ADVANCED CONNECT           ")
    }
    $filePath = ${env:WSL_WORKDIR}
    $fileDat = "wsl${env:WSL_LOAD_TABLE}${env:WSL_SEQUENCE}"
    $logStream.WriteLine("================= CLEANUP FILES ===================")
    foreach($df in $((Get-ChildItem "${filePath}${fileDat}*").FullName)){
        $logStream.WriteLine("Cleaning file $df")
        Remove-Item -Path $df
    }
    $logStream.WriteLine("Cleaning ${fileDat} from SnowFlake stage")
    $snowsqlDelete = "snowsql -q ""rm @~ pattern='.*$fileDat.*'""  -o friendly=false  -o remove_comments=true -o timing=false"
    $putRes = & $([ScriptBlock]::Create($snowsqlDelete))
    $logStream.WriteLine("================= EXTRACT SQL =====================")
    $extractSql = @"
    SELECT vwPrsDetail.Id
    , vwPrsDetail.CustomerId
    , vwPrsDetail.RegistrationId
    , vwPrsDetail.ReportingCustomerId
    , vwPrsDetail.ContractId
    , vwPrsDetail.SupplierId
    , convert(varchar(19),ReceiptDate,120)
    , convert(varchar(19),TransactionDate,120)
    , vwPrsDetail.SalesAmount
    , vwPrsDetail.RevenueAmount
    , convert(varchar(19),CreateDate,120)
    , convert(varchar(19),ModifiedDate,120)
    , vwPrsDetail.OrganizationName
    , vwPrsDetail.OrganizationId
    , vwPrsDetail.UnitsSold
    , vwPrsDetail.PricePerUnit
    , vwPrsDetail.RebateName
    , vwPrsDetail.ContractRebateDetailsId
    , vwPrsDetail.TransactionClassId
    , vwPrsDetail.Note
    , vwPrsDetail.ChannelPartnerId
    , vwPrsDetail.ChannelPartnerFeesharePartnerId
    , vwPrsDetail.Level1ForceId
    , vwPrsDetail.ContractExternalCategoryId
    , vwPrsDetail.CustomerIdFromExternalSystem
    , vwPrsDetail.CustomerLevel1IdFromExternalSystem
FROM ${env:WSL_SRC_SCHEMA}.vwPrsDetail vwPrsDetail
"@

    $logStream.WriteLine($extractSql)
    if (![string]::IsNullOrWhiteSpace($env:WSL_SRC_CONSTRING)) {
        $logStream.WriteLine("    SOURCE DATABASE CONNECTED USING ADVANCED CONNECT       ")
    }
    $logStream.WriteLine("")

    if(${env:DEBUG} -eq "TRUE") {
        $logStream.WriteLine("BEGIN create of data file from source system: $(Get-Date)")
    }
    $unicode = $false
    if( ${env:UNICODE_SUPPORT} -eq "TRUE") {
        $unicode = $true
    }
    $OdbcDump = Get-OdbcDumpSource
    Add-Type -TypeDefinition $OdbcDump -Language CSharp -ReferencedAssemblies "System.Data"
    $wslOdbc = New-Object WhereScape.OdbcDump
    #GetDataToFile(string query, string dsn, string username, string password, string dataFile, string delimiter, int fileCount, int splitThreshold, bool addQuotes, bool unicode, string enclosedBy, string escapeChar)


    try {
        $rowCount = $wslOdbc.GetDataToFile($extractSQL,${env:WSL_SRC_DSN},${env:WSL_SRC_USER},${env:WSL_SRC_PWD},"${filePath}${fileDat}",${env:UNLOAD_DELIM}, ${env:FILE_COUNT}, ${env:SPLIT_THRESHOLD}, $true, $unicode, ${env:UNLOAD_ENC}, ${env:UNLOAD_ESC})
    }
    catch {
        $null = WsWrkAudit -Status "E" -Message "An error has occurred: $_.Exception.Message"
        [Console]::WriteLine("-2")
        [Console]::WriteLine("Load failed")
        $logStream.WriteLine($_.Exception.Message)
        $logStream.WriteLine($_.InvocationInfo.PositionMessage)
        Print-Log
    }


    if(${env:DEBUG} -eq "TRUE") {
        $logStream.WriteLine("END create of data file from source system: $(Get-Date)")
    }
    if($rowCount -lt 1) {
        $logStream.WriteLine("Source query returned 0 rows")
        [Console]::WriteLine("1")
        [Console]::WriteLine("Source query returned 0 rows")
        Print-Log
        exit
    }
    $dataFiles = New-Object System.Collections.Generic.List[string]
    foreach($df in $((Get-ChildItem "${filePath}${fileDat}*").FullName)){
        $null = $dataFiles.Add($df)
    }    if(($runMode -ne "S3") -and ($runmode -ne "AZ") -and ($runmode -ne "GCS") -and [string]::IsNullOrWhiteSpace(${env:EXTERNAL_STAGE})) {
        $sourcePath = $filePath
        for($x = 0; $x -lt $dataFiles.Count; $x++) {
            $originalName = $dataFiles[$x]
            if(${env:SEND_FILES_ZIPPED} -eq "TRUE") { 
                $dataFiles[$x] = Gzip-File -inFile $originalName -RemoveOriginal
                $logStream.WriteLine("File '$originalName' GZipped to '$($dataFiles[$x])' prior to upload")            }
        }
    }
    $logStream.WriteLine("================= LOAD =================")
    $sfOdbc = New-Object System.Data.Odbc.OdbcConnection
    if ([string]::IsNullOrEmpty($env:WSL_TGT_CONSTRING)) {
        $sfOdbc.ConnectionString = "DSN=$env:WSL_TGT_DSN"
        if (! [string]::IsNullOrEmpty($env:WSL_TGT_USER)) { $sfOdbc.ConnectionString += ";UID=$env:WSL_TGT_USER" }
        if (! [string]::IsNullOrEmpty($env:WSL_TGT_PWD))  { $sfOdbc.ConnectionString += ";PWD=$env:WSL_TGT_PWD" }
    } else {
        $sfOdbc.ConnectionString = $env:WSL_TGT_CONSTRING
    }
    $sfOdbc.Open()
    if((($runMode -ne "S3") -and ($runmode -ne "AZ") -and ($runmode -ne "GCS")) -or ( ! [string]::IsNullOrEmpty(${env:EXTERNAL_STAGE}))) {
        if(($runMode -ne "S3") -and ($runmode -ne "AZ") -and ($runmode -ne "GCS") -and [string]::IsNullOrEmpty(${env:EXTERNAL_STAGE})) {
            $remSQL = "remove @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY}*;"
            $null = (New-Object System.Data.Odbc.OdbcCommand($remSQL,$sfOdbc)).ExecuteNonQuery()
            $fullFilePath=Join-Path -Path ${filePath} -ChildPath ${fileDat}

            if(${env:SEND_FILES_ZIPPED} -eq "TRUE") {
                $putSQL = "PUT 'file://$($fullFilePath.Replace("\","/"))*.gz' @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY};"
            }
            else {
                $putSQL = "PUT 'file://$($fullFilePath.Replace("\","/"))*' @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY};"
            }
            $snowsqlPut = "snowsql -q ""$putsql"" -o friendly=false  -o remove_comments=true -o output_format=csv -o timing=false"
            if(${env:DEBUG} -eq "TRUE") {
                $logStream.WriteLine("BEGIN PUT of files matching ${filePath}${fileDat} : $(Get-Date)")
            }
            $logStream.WriteLine("PUT:")
            $logStream.WriteLine($putSQL)
            $logStream.WriteLine("")
            $putRes = & $([ScriptBlock]::Create($snowsqlPut))
            if(${env:DEBUG} -eq "TRUE") {
                $logStream.WriteLine("END PUT of files matching ${env:filePath}${fileDat} : $(Get-Date)")
            }
        }
        if( ! [string]::IsNullOrEmpty(${env:TIMEZONE})) {
            $tzStmt = "ALTER SESSION SET TIMEZONE = '${env:TIMEZONE}';`r`n"
            $logStream.WriteLine("SET TIMEZONE:")
            $logStream.WriteLine($tzStmt)
            $tzCmd = New-Object System.Data.Odbc.OdbcCommand($tzStmt,$sfOdbc)
            $null = $tzCmd.ExecuteNonQuery()
        }


        if(${env:FILE_TYPE} -eq "CSV") {
            $copyStmt = @"
            COPY INTO ${env:WSL_LOAD_FULLNAME}
            ( Id
            , CustomerId
            , RegistrationId
            , ReportingCustomerId
            , ContractId
            , SupplierId
            , ReceiptDate
            , TransactionDate
            , SalesAmount
            , RevenueAmount
            , CreateDate
            , ModifiedDate
            , OrganizationName
            , OrganizationId
            , UnitsSold
            , PricePerUnit
            , RebateName
            , ContractRebateDetailsId
            , TransactionClassId
            , Note
            , ChannelPartnerId
            , ChannelPartnerFeesharePartnerId
            , ChannelPartnerLevel1ForceId
            , ContractExternalCategoryId
            , CustomerIdFromExternalSystem
            , CustomerLevel1IdFromExternalSystem
            , dss_record_source
            , dss_load_date
            )
            FROM (
              SELECT t.`$1
                   , t.`$2
                   , t.`$3
                   , t.`$4
                   , t.`$5
                   , t.`$6
                   , t.`$7
                   , t.`$8
                   , t.`$9
                   , t.`$10
                   , t.`$11
                   , t.`$12
                   , t.`$13
                   , t.`$14
                   , t.`$15
                   , t.`$16
                   , t.`$17
                   , t.`$18
                   , t.`$19
                   , t.`$20
                   , t.`$21
                   , t.`$22
                   , t.`$23
                   , t.`$24
                   , t.`$25
                   , t.`$26
                   ,  'FORCE.dbo.vwPrsDetail'
                   ,  CAST(CURRENT_TIMESTAMP AS TIMESTAMP)
              FROM $(if ( [string]::IsNullOrWhiteSpace(${env:EXTERNAL_STAGE})) { "@~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY}/ t" }
                     else {if ( [string]::IsNullOrWhiteSpace(${fileDat}.Trim())){"@${env:EXTERNAL_STAGE}/ t"} else{ "@${env:EXTERNAL_STAGE}/${fileDat}/ t"}})
            )
            FILE_FORMAT = '${env:fileFormatFullName}'
            PURGE = $PURGE
"@
        }
        else {
            $copyStmt = @"
            COPY INTO ${env:WSL_LOAD_FULLNAME}
            FROM @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY}/
            FILE_FORMAT = '${env:fileFormatFullName}'
            PURGE = $PURGE
"@
        }
        $maskedLoad = $copyStmt

        $logStream.WriteLine("COPY:")
        $logStream.WriteLine($maskedLoad)
        if(${env:DEBUG} -eq "TRUE") {
            $logStream.WriteLine("BEGIN COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
        }
        try {
                $copyCmd = New-Object System.Data.Odbc.OdbcCommand($copyStmt,$sfOdbc)
                $copyCmd.CommandTimeout = 0
                $loadRes = New-Object Data.DataTable
                $null = (New-Object Data.odbc.odbcDataAdapter($copyCmd)).fill($loadRes)
        }
				catch {
				        
                if(${env:DEBUG} -ne "TRUE"){
                $logStream.WriteLine("================= CLEANUP FILES ===================")
                $logStream.WriteLine("Cleaning ${fileDat} from SnowFlake stage")
                $snowsqlDelete = "snowsql -q ""rm @~ pattern='.*$fileDat.*'""  -o friendly=false  -o remove_comments=true -o timing=false"
                $putRes = & $([ScriptBlock]::Create($snowsqlDelete))

								foreach($df in $((Get-ChildItem "${filePath}${fileDat}*").FullName)){
                        $logStream.WriteLine("Cleaning file $df")
                        Remove-Item -Path $df
                }}
								$logStream.WriteLine($_.Exception.InnerException.Message)
								[Console]::WriteLine("-2")
                [Console]::WriteLine("Load failed")
                Print-Log
                exit
        }

        if(${env:DEBUG} -eq "TRUE") {
            $logStream.WriteLine("END COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
        }
        if(($loadRes.errors_seen | Measure-Object -Sum).Sum -eq 0) {
            $rowsLoaded = ($loadRes.rows_loaded | Measure-Object -Sum).Sum
            $logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
            if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
                $logStream.Write("$rowsLoaded rows loaded.")
            } else {
                $logStream.Write("$rowsLoaded rows loaded using Advanced Connect.")
            }
            try {
                $null = WsWrkTask -Inserted $rowsLoaded
            } catch { $logStream.WriteLine("Failed to update task row count"); $logStream.WriteLine($_.Exception.InnerException.Message)}
  	        if($rowCount -eq $rowsLoaded) {
              [Console]::WriteLine("1")
              [Console]::WriteLine("Load successful. Source rowcount $rowCount. $rowsLoaded rows loaded.")
                            if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
                                $logStream.WriteLine("Source rowcount $rowCount. $rowsLoaded rows loaded.")
                            } else {
                                $logStream.WriteLine("Source rowcount $rowCount. $rowsLoaded rows loaded using Advanced connect.")
                            }
             }
            else {
              [Console]::WriteLine("-2")
              [Console]::WriteLine("Source and Target rowcount does not match. Source rowcount $rowCount. Target rowcount $rowsLoaded.")
             }
            if(${env:DEBUG} -ne "TRUE") {
                foreach($file in $dataFiles) {
                    Remove-Item $file
                }
    						$null = (New-Object System.Data.Odbc.OdbcCommand($remSQL,$sfOdbc)).ExecuteNonQuery()
            }
            else {
                $logStream.WriteLine("Temporary data files not removed as debug mode is enabled")
            }
        }
        else {
            $logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
            [Console]::WriteLine("-2")
            [Console]::WriteLine("Load failed")
            Print-Log
            exit
        }
        $sfOdbc.Close()
    }
    elseif($runmode -eq "S3") {
		    $sourcefilename = ''
            if( ! [string]::IsNullOrEmpty(${env:TIMEZONE})) {
                $tzStmt = "ALTER SESSION SET TIMEZONE = '${env:TIMEZONE}';`r`n"
                $logStream.WriteLine("SET TIMEZONE:")
                $logStream.WriteLine($tzStmt)
                $tzCmd = New-Object System.Data.Odbc.OdbcCommand($tzStmt,$sfOdbc)
                $null = $tzCmd.ExecuteNonQuery()
            }

			      if ("false" -eq "true") {
                $skipheadervalue = 1
            } else {
                $skipheadervalue = 0
            }
			      $sourceFilePath="".Trim()
            if ($sourceFilePath[-1] -eq "/") {
              $sourceFilePath = $sourceFilePath.TrimEnd('/')
			      }
            $sourceFileFieldDelimiter='|'
            $sourceFileRecordDelimiter="".Trim()
            $sourceFileFieldEnclosure='"'
						if ($sourceFileFieldEnclosure.Trim() -eq ""){
							$sourceFileFieldEnclosure='"'
						}
            if ($sourceFileRecordDelimiter.Trim() -eq "") {
              $sourceFileRecordDelimiter = "\n"
			      }
		      	$awsAccessKey=${env:ACCESS_KEY}
            $fileDetails=$sourceFilePath.Trim()+"/"+$sourcefilename
            if (($awsAccessKey -replace '\s', '') -eq "") {
               $awsAccessKey='$WSL_SRCCFG_s3AccessKey$'
            }
            $awsSecretKey=${env:SECRET_KEY}
            if (($awsSecretKey -replace '\s', '') -eq "") {
              $awsSecretKey=${env:WSL_SRCCFG_s3SecretKey}
            }
			
			


            $s3Prefix  = Get-ExtendedProperty -PropertyName "SF_S3_BUCKET_PREFIX" -TableName ${env:WSL_LOAD_TABLE}
			if(($s3Prefix.StartsWith("s3://"))){
			$copyStmt = @"
            COPY INTO ${env:WSL_LOAD_FULLNAME}
            FROM ${filePath}${fileDat}
            CREDENTIALS = (AWS_KEY_ID='${env:ACCESS_KEY}' AWS_SECRET_KEY='${env:SECRET_KEY}')
            FILE_FORMAT = '${env:fileFormatFullName}'
            PURGE = $PURGE
"@
			}else{
            $copyStmt = @"
            COPY INTO ${env:WSL_LOAD_FULLNAME}
            FROM $fileDetails
            CREDENTIALS = (AWS_KEY_ID='$awsAccessKey' AWS_SECRET_KEY='$awsSecretKey')
            FILE_FORMAT = ( TYPE = csv FIELD_DELIMITER = '$sourceFileFieldDelimiter' SKIP_HEADER = $skipheadervalue RECORD_DELIMITER = '$sourceFileRecordDelimiter' FIELD_OPTIONALLY_ENCLOSED_BY = '$sourceFileFieldEnclosure' ERROR_ON_COLUMN_COUNT_MISMATCH = 
"@
            if ($sourcefilename.ToUpper().EndsWith('JSON')){
               $copyStmt = @"
               COPY INTO ${env:WSL_LOAD_FULLNAME}
               FROM '$fileDetails'
               CREDENTIALS = (AWS_KEY_ID='$awsAccessKey' AWS_SECRET_KEY='$awsSecretKey')
               FILE_FORMAT = (TYPE = 'JSON'  STRIP_OUTER_ARRAY = true )
"@
}

            if ($sourcefilename.ToUpper().EndsWith('XML')){
               $copyStmt = @"
               COPY INTO ${env:WSL_LOAD_FULLNAME}
               FROM $fileDetails
               CREDENTIALS = (AWS_KEY_ID='$awsAccessKey' AWS_SECRET_KEY='$awsSecretKey')

               FILE_FORMAT = (TYPE = 'XML')
"@
}
			}
            $maskedLoad = $copyStmt
            if( ! [string]::IsNullOrWhiteSpace("$awsAccessKey")) {
                $maskedLoad = $maskedLoad.Replace("$awsAccessKey",(New-Object String ('*', "$awsAccessKey".Length)))
            }
            if( ! [string]::IsNullOrWhiteSpace($awsSecretKey)) {
                $maskedLoad = $maskedLoad.Replace($awsSecretKey,(New-Object String ('*', $awsSecretKey.Length)))
            }

            $logStream.WriteLine("COPY:")
            $logStream.WriteLine($maskedLoad)
            $logStream.WriteLine("")
            if(${env:DEBUG} -eq "TRUE") {
                $logStream.WriteLine("BEGIN COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
            }
            $copyCmd = New-Object System.Data.Odbc.OdbcCommand($copyStmt,$sfOdbc)
            $copyCmd.CommandTimeout = 0
            $loadRes = New-Object Data.DataTable
            $null = (New-Object Data.odbc.odbcDataAdapter($copyCmd)).fill($loadRes)
            if(${env:DEBUG} -eq "TRUE") {
                $logStream.WriteLine("END COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
            }
            if(($loadRes.errors_seen | Measure-Object -Sum).Sum -eq 0) {
                $rowsLoaded = ($loadRes.rows_loaded | Measure-Object -Sum).Sum
                $logStream.WriteLine(($loadRes | Where { $_.file -ne "" } | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
                if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
                    $logStream.Write("$rowsLoaded rows loaded.")
                } else {
                    $logStream.Write("$rowsLoaded rows loaded using Advanced Connect.")
                }

                try {
                    $null = WsWrkTask -Inserted $rowsLoaded
                } catch { $logStream.WriteLine("Failed to update task row count"); $logStream.WriteLine($_.Exception.InnerException.Message)}
                [Console]::WriteLine("1")
                [Console]::WriteLine("Load successful. $rowsLoaded rows loaded.")
               

								
            }
            else {
                $logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
                [Console]::WriteLine("-2")
                [Console]::WriteLine("Load failed")
                Print-Log
                exit
            }
            $sfOdbc.Close()
        }
	
    elseif($runmode -eq "AZ") {
            if( ! [string]::IsNullOrEmpty(${env:TIMEZONE})) {
                $tzStmt = "ALTER SESSION SET TIMEZONE = '${env:TIMEZONE}';`r`n"
                $logStream.WriteLine("SET TIMEZONE:")
                $logStream.WriteLine($tzStmt)
                $tzCmd = New-Object System.Data.Odbc.OdbcCommand($tzStmt,$sfOdbc)
                $null = $tzCmd.ExecuteNonQuery()
            }

			      if ("false" -eq "true") {
                $skipheadervalue = 1
            } else {
                $skipheadervalue = 0
            }

           $azureSasToken=${env:AZ_SAS_TOKEN}
           $sourceFileRecordDelimiter="".Trim()
           $sourceFileFieldDelimiter='|'
           $sourceFileFieldEnclosure='"'
		   if ($sourceFileFieldEnclosure.Trim() -eq ""){
				$sourceFileFieldEnclosure='"'
		   }
           if ($sourceFileRecordDelimiter.Trim() -eq "") {
                $sourceFileRecordDelimiter = "\n"
		   }

           if ($azureSasToken -eq $null){
               $azureSasToken= ${env:WSL_SRCCFG_azureSASToken}}

           if ($sourceFileRecordDelimiter.Trim() -eq "") {
              $sourceFileRecordDelimiter = "\n"
			      }

            $sourceFilePath = ${filePath}.Replace("https://","azure://").Replace("dfs.core.windows.net","blob.core.windows.net")
						$fileName = $fileDat.ToUpper()
            $copyStmt = @"
            COPY INTO ${env:WSL_LOAD_FULLNAME}
            FROM '$sourceFilePath$fileDat'
            CREDENTIALS = (AZURE_SAS_TOKEN='$azureSasToken')
            FILE_FORMAT = ( TYPE = csv ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE FIELD_DELIMITER = '$sourceFileFieldDelimiter' SKIP_HEADER = $skipheadervalue RECORD_DELIMITER = '$sourceFileRecordDelimiter' FIELD_OPTIONALLY_ENCLOSED_BY = '$sourceFileFieldEnclo
"@

            if ($fileName.EndsWith('JSON')){
               $copyStmt = @"
               COPY INTO ${env:WSL_LOAD_FULLNAME}
               FROM '$sourceFilePath$fileDat'
               CREDENTIALS = (AZURE_SAS_TOKEN='$azureSasToken')
               FILE_FORMAT = (TYPE = 'JSON'  STRIP_OUTER_ARRAY = true )
"@
}


            if ($fileName.EndsWith('XML')){
               $copyStmt = @"
               COPY INTO ${env:WSL_LOAD_FULLNAME}
               FROM '$sourceFilePath$fileDat'
               CREDENTIALS = (AZURE_SAS_TOKEN='$azureSasToken')
               FILE_FORMAT = (TYPE = 'XML')
"@
}

            $maskedLoad = $copyStmt
            if( ! [string]::IsNullOrWhiteSpace($azureSasToken)) {
                $maskedLoad = $maskedLoad.Replace($azureSasToken,(New-Object String ('*', $azureSasToken.Length)))
            }
            if( ! [string]::IsNullOrWhiteSpace(${env:AZ_ENCRYPTION_KEY})) {
                $maskedLoad = $maskedLoad.Replace(${env:AZ_ENCRYPTION_KEY},(New-Object String ('*', ${env:AZ_ENCRYPTION_KEY}.Length)))
            }

            $logStream.WriteLine("COPY:")
            $logStream.WriteLine($maskedLoad)
            $logStream.WriteLine("")
            if(${env:DEBUG} -eq "TRUE") {
                $logStream.WriteLine("BEGIN COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
            }
            $copyCmd = New-Object System.Data.Odbc.OdbcCommand($copyStmt,$sfOdbc)
            $copyCmd.CommandTimeout = 0
            $loadRes = New-Object Data.DataTable
            $null = (New-Object Data.odbc.odbcDataAdapter($copyCmd)).fill($loadRes)
            if(${env:DEBUG} -eq "TRUE") {
                $logStream.WriteLine("END COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
            }
            if(($loadRes.errors_seen | Measure-Object -Sum).Sum -eq 0) {
                $rowsLoaded = ($loadRes.rows_loaded | Measure-Object -Sum).Sum
                $logStream.WriteLine(($loadRes | Where { $_.file -ne "" } | Format-Table -Property file,status,rows_parsed,rows_loaded   | Out-String -width 500 ))
                if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
                    $logStream.Write("$rowsLoaded rows loaded.")
                } else {
                    $logStream.Write("$rowsLoaded rows loaded using Advanced Connect.")
                }
                try {
                    $null = WsWrkTask -Inserted $rowsLoaded
                } catch { $logStream.WriteLine("Failed to update task row count"); $logStream.WriteLine($_.Exception.InnerException.Message)}
                [Console]::WriteLine("1")
                [Console]::WriteLine("Load successful.$rowsLoaded rows loaded.")
            }
            else {
                $logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
                [Console]::WriteLine("-2")
                [Console]::WriteLine("Load failed")
                Print-Log
                exit
            }
            $sfOdbc.Close()
    }
}

try {
    $fileAud = Join-Path -Path ${env:WSL_WORKDIR} -ChildPath "${env:WSL_LOAD_TABLE}_${env:WSL_SEQUENCE}.txt"
    $logStream = New-Object IO.StreamWriter($FileAud,$false)
    $logStream.AutoFlush = $true

    Add-Type -Path $(Join-Path -Path ${env:WSL_BINDIR} -ChildPath 'WslMetadataServiceClient.dll')
    $metaDbType = [WslMetadataServiceClient.MetaDatabaseType]::SqlServer
    if( ! ${env:WSL_WORKDIR}.EndsWith('\')) {
        ${env:WSL_WORKDIR} += '\'
    }
    $runMode = "Database"
    ${env:warn} = $false

    Load-Data

    Print-Log
}
catch {
    try {
        $repo.Dispose()
    }
    catch {}
    [Console]::WriteLine("-2")
    [Console]::WriteLine("Load failed")
    $logStream.WriteLine($_.Exception.Message)
    $logStream.WriteLine($_.InvocationInfo.PositionMessage)
    Print-Log
}


Return to Top