SCRIPT_ForceSales
Technical Description of SCRIPT_ForceSales within Prod OMNIA Partners EDW at 25-Mar-2025 12:24:48
Purpose: | SCRIPT_ForceSales |
Date Created: | 2021-10-21 13:11:51 |
Date Last Updated: | |
Script Type: | PowerShell (64-bit) script |
Notes: | |
#==============================================================================
# DBMS Name : SNOWFLAKE Custom*
# Template : wsl_snowflake_pscript_load
# Template Version : 8.5.1.0
# Description : Load ForceSales
# Generated by : WhereScape RED Version 9.0.2.1 (build 240801-225410)
# Generated for : Omnia Partners
# Generated on : Monday, February 03, 2025 at 10:50:17
# Author : Amrit Nandrey
# Source Schema : dbo
# Source Table : vwPrsDetail
#==============================================================================
# Notes / History
#
Import-module -Name WslPowershellCommon -DisableNameChecking
Hide-Window
function Print-Log {
$logStream.Dispose()
$logReader = New-Object IO.StreamReader($fileAud)
while( ! $logReader.EndOfStream) {
[Console]::WriteLine($logReader.ReadLine())
}
$logReader.Dispose()
}
Function Replace-WslTags($stuff) {
if([string]::IsNullOrWhitespace($stuff)) {
return $stuff
}
if($stuff.Contains('$SEQUENCE$')) {
$stuff = $stuff.Replace('$SEQUENCE$',${env:WSL_SEQUENCE})
}
if([regex]::IsMatch($stuff,'\$.+\$')) {
# If $stuff contains two or more $s and the $SEQUENCE$ string is not detected
# or has already been replaced then we assume a date
while([regex]::IsMatch($stuff,'\$.+\$')) {
$startPos = $stuff.IndexOf('$')
$work = $stuff.SubString($startPos + 1)
$endPos = $work.IndexOf('$')
$suppliedFormat = $work.SubString(0, $endPos)
$dateFormat = $suppliedFormat.Replace('YY','yy').Replace('DD','dd').Replace('HH','hh').Replace('MI','mm').Replace('SS','ss')
$dateString = (Get-Date -f $dateFormat)
$replaceString = '$' + $suppliedFormat + '$'
$stuff = $stuff.Replace($replaceString,$dateString)
}
}
if($stuff.indexOf('$') -ne -1) {
${env:warn} = $true
$logStream.WriteLine("Unclosed '$' tag in '$stuff'")
$logStream.WriteLine("Unclosed '$' will be removed")
$stuff = $stuff.Replace('$','')
}
return $stuff.Trim()
}
Function Gzip-File {
param(
[string]$inFile = $(throw "No input file specified"),
[string]$outFile = $inFile + ".gz",
[switch]$removeOriginal = $false
)
$input = New-Object IO.FileStream $InFile, ([IO.FileMode]::Open), ([IO.FileAccess]::Read), ([IO.FileShare]::Read)
$output = New-Object IO.FileStream $OutFile, ([IO.FileMode]::Create), ([IO.FileAccess]::Write), ([IO.FileShare]::None)
$gzipStream = New-Object IO.Compression.GzipStream $Output, ([IO.Compression.CompressionLevel]::Fastest)
$input.CopyTo($gzipStream)
$gzipStream.Dispose()
$output.Dispose()
$input.Dispose()
if($removeOriginal) {
Remove-Item -Path $inFile
}
return (gci $outFile).FullName
}
function Load-Data {
# Get extended property values
${env:DEBUG} = Get-ExtendedProperty -PropertyName "SF_DEBUG_MODE" -TableName ${env:WSL_LOAD_TABLE}
${env:PURGE} = Get-ExtendedProperty -PropertyName "SF_PURGE" -TableName ${env:WSL_LOAD_TABLE}
${env:ACCESS_KEY} = Get-ExtendedProperty -PropertyName "SF_ACCESS_KEY" -TableName ${env:WSL_LOAD_TABLE}
${env:SECRET_KEY} = Get-ExtendedProperty -PropertyName "SF_SECRET_KEY" -TableName ${env:WSL_LOAD_TABLE}
${env:SEND_FILES_ZIPPED} = Get-ExtendedProperty -PropertyName "SF_SEND_FILES_ZIPPED" -TableName ${env:WSL_LOAD_TABLE}
${env:FILE_FORMAT} = Get-ExtendedProperty -PropertyName "SF_FILE_FORMAT" -TableName ${env:WSL_LOAD_TABLE}
${env:SNOWSQL_ACCOUNT} = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_ACCOUNT" -TableName ${env:WSL_LOAD_TABLE}
${env:SNOWSQL_DATABASE} = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_DATABASE" -TableName ${env:WSL_LOAD_TABLE}
${env:SNOWSQL_SCHEMA} = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_SCHEMA" -TableName ${env:WSL_LOAD_TABLE}
${env:SNOWSQL_WAREHOUSE} = Get-ExtendedProperty -PropertyName "SF_SNOWSQL_WAREHOUSE" -TableName ${env:WSL_LOAD_TABLE}
${env:UNICODE_SUPPORT} = Get-ExtendedProperty -PropertyName "SF_UNICODE_SUPPORT" -TableName ${env:WSL_LOAD_TABLE}
${env:UNLOAD_DELIM} = Get-ExtendedProperty -PropertyName "SF_UNLOAD_DELIMITER" -TableName ${env:WSL_LOAD_TABLE}
${env:UNLOAD_ENC} = Get-ExtendedProperty -PropertyName "SF_UNLOAD_ENCLOSED_BY" -TableName ${env:WSL_LOAD_TABLE}
${env:UNLOAD_ESC} = Get-ExtendedProperty -PropertyName "SF_UNLOAD_ESCAPE_CHAR" -TableName ${env:WSL_LOAD_TABLE}
${env:SPLIT_THRESHOLD} = Get-ExtendedProperty -PropertyName "SF_SPLIT_THRESHOLD" -TableName ${env:WSL_LOAD_TABLE}
${env:FILE_COUNT} = Get-ExtendedProperty -PropertyName "SF_SPLIT_COUNT" -TableName ${env:WSL_LOAD_TABLE}
${env:TIMEZONE} = Get-ExtendedProperty -PropertyName "SF_TIMEZONE" -TableName ${env:WSL_LOAD_TABLE}
${env:FILE_TYPE} = Get-ExtendedProperty -PropertyName "SF_FILE_TYPE" -TableName ${env:WSL_LOAD_TABLE}
${env:EXTERNAL_STAGE} = Get-ExtendedProperty -PropertyName "SF_EXTERNAL_STAGE" -TableName ${env:WSL_LOAD_TABLE}
${env:AZ_SAS_TOKEN} = Get-ExtendedProperty -PropertyName "SF_AZURE_SAS_TOKEN" -TableName ${env:WSL_LOAD_TABLE}
${env:AZ_ENCRYPTION_TYPE} = Get-ExtendedProperty -PropertyName "SF_AZURE_ENCRYPTION_TYPE" -TableName ${env:WSL_LOAD_TABLE}
${env:AZ_ENCRYPTION_KEY} = Get-ExtendedProperty -PropertyName "SF_AZURE_ENCRYPTION_KEY" -TableName ${env:WSL_LOAD_TABLE}
${env:GCS_STORAGE_INTEGRATION} = Get-ExtendedProperty -PropertyName "SF_GCS_STORAGE_INTEGRATION" -TableName ${env:WSL_LOAD_TABLE}
${env:GCS_STAGE_AREA_NAME} = Get-ExtendedProperty -PropertyName "SF_GCS_STAGE_AREA_NAME" -TableName ${env:WSL_LOAD_TABLE}
if(${env:UNICODE_SUPPORT} -ne "TRUE") {
${env:UNICODE_SUPPORT} = "FALSE"
}
if(${env:SEND_FILES_ZIPPED} -ne "TRUE") {
${env:SEND_FILES_ZIPPED} = "FALSE"
}
if(${env:FILE_COUNT} -lt 1) {
${env:FILE_COUNT} = 1
}
if([string]::IsNullOrEmpty(${env:UNLOAD_DELIM})) {
${env:UNLOAD_DELIM} = "|"
}
if([string]::IsNullOrEmpty(${env:UNLOAD_ENC})) {
${env:UNLOAD_ENC} = '"'
}
if([string]::IsNullOrEmpty(${env:UNLOAD_ESC})) {
${env:UNLOAD_ESC} = "#"
}
if([string]::IsNullOrEmpty(${env:FILE_TYPE})) {
${env:FILE_TYPE} = "CSV"
}
${env:SNOWSQL_USER} = ${env:WSL_TGT_USER}
${env:SNOWSQL_PWD} = ${env:WSL_TGT_PWD}
if ( ![string]::IsNullOrWhiteSpace(${env:EXTERNAL_STAGE})){
$PURGE = "FALSE"
if ( ![string]::IsNullOrWhiteSpace(${env:PURGE})){
$PURGE = ${env:PURGE}
}
}
else{
$PURGE = "TRUE"
if ( ![string]::IsNullOrWhiteSpace(${env:PURGE})){
$PURGE = ${env:PURGE}
}
}
# Work out full file format name
Add-Type -Path $(Join-Path -Path ${env:WSL_BINDIR} -ChildPath 'WslMetadataServiceClient.dll')
$metaDbType = [WslMetadataServiceClient.MetaDatabaseType]::SqlServer
# Metadata DSN, Meta DB Type enum, Metadata User, Metadata Password, Metadata schema and lastly maybe 64bit
if ([System.Environment]::Is64BitProcess) {
$metaArch = [WslMetadataServiceClient.Architecture]::_64bit
$repo = New-Object WslMetadataServiceClient.Repo(${env:WSL_META_DSN},$metaDbType,${env:WSL_META_USER},${env:WSL_META_PWD},"dbo.",$metaArch)
}
else {
$repo = New-Object WslMetadataServiceClient.Repo(${env:WSL_META_DSN},$metaDbType,${env:WSL_META_USER},${env:WSL_META_PWD},"dbo.")
}
$root = $repo.objectsByName
$fileFormatObject = $root[${env:FILE_FORMAT}]
${env:fileFormatDatabase} = $fileFormatObject.target.database.GetValue()
${env:fileFormatSchema} = $fileFormatObject.target.schema.GetValue()
${env:fileFormatFullName} = ""
if( ! [string]::IsNullOrEmpty(${env:fileFormatDatabase})) { ${env:fileFormatFullName} += ${env:fileFormatDatabase} + "." }
if( ! [string]::IsNullOrEmpty(${env:fileFormatSchema})) { ${env:fileFormatFullName} += ${env:fileFormatSchema} + "." }
${env:fileFormatFullName} += ${env:FILE_FORMAT}
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("=================== LOAD OPTIONS ===================")
$logStream.WriteLine("Specified Load Table: " + ${env:WSL_LOAD_TABLE})
$logStream.WriteLine("Specified Work Dir: " + ${env:WSL_WORKDIR})
$logStream.WriteLine("Specified Sequence: " + ${env:WSL_SEQUENCE})
$logStream.WriteLine("Specified Metadata ODBC DSN: " + ${env:WSL_META_DSN})
$logStream.WriteLine("Specified Metadata Username: " + ${env:WSL_META_USER})
$logStream.WriteLine("Specified Metadata Password: " + (New-Object string ('*', ${env:WSL_META_PWD}.Length)))
$logStream.WriteLine("")
if($runMode -eq 'S3') {
$logStream.WriteLine("=================== CONNECTION OPTIONS ===================")
$logStream.WriteLine("Access Key: " + (New-Object string ('*', ${env:ACCESS_KEY}.Length)))
$logStream.WriteLine("Secret Key: " + (New-Object string ('*', ${env:SECRET_KEY}.Length)))
$logStream.WriteLine("")
}
elseif($runmode -eq "AZ") {
$logStream.WriteLine("=================== CONNECTION OPTIONS ===================")
$logStream.WriteLine("SAS Token: " + (New-Object string ('*', ${env:AZ_SAS_TOKEN}.Length)))
$logStream.WriteLine("Encryption Method: " + ${env:AZ_ENCRYPTION_TYPE})
$logStream.WriteLine("Encryption Key: " + (New-Object string ('*', ${env:AZ_ENCRYPTION_KEY}.Length)))
$logStream.WriteLine("")
}
elseif($runmode -eq "GCS") {
$logStream.WriteLine("=================== CONNECTION OPTIONS ===================")
$logStream.WriteLine("GCS STORAGE INTEGRATION: " + ${env:GCS_STORAGE_INTEGRATION})
$logStream.WriteLine("GCS STAGE AREA NAME: " + ${env:GCS_STAGE_AREA_NAME})
$logStream.WriteLine("")
}
$logStream.WriteLine("=================== MODES ===================")
$logStream.WriteLine("Specified Debug Mode: " + ${env:DEBUG})
$logStream.WriteLine("Specified Run Mode: " + $runMode)
$logStream.WriteLine("Unicode Extract: " + ${env:UNICODE_SUPPORT})
$logStream.WriteLine("")
if($runMode -eq 'Database') {
$logStream.WriteLine("=================== SOURCE TABLE INFO ===================")
$logStream.WriteLine("Source Schema: " + ${env:WSL_SRC_SCHEMA})
$logStream.WriteLine("Source Tables: " + @"
vwPrsDetail
"@)
$logStream.WriteLine("Source Where: " + @"
"@)
$logStream.WriteLine("")
$logStream.WriteLine("=================== SOURCE DB INFO ===================")
$logStream.WriteLine("ODBC Source DSN: " + ${env:WSL_SRC_DSN})
$logStream.WriteLine("ODBC Source Username: " + ${env:WSL_SRC_USER})
$logStream.WriteLine("ODBC Source Password: " + (New-Object string ('*', ${env:WSL_SRC_PWD}.Length)))
$logStream.WriteLine("")
}
$logStream.WriteLine("=================== EXTENDED PROPERTIES ===================")
$logStream.WriteLine("SF_SNOWSQL_ACCOUNT: " + ${env:SNOWSQL_ACCOUNT})
$logStream.WriteLine("SF_SNOWSQL_WAREHOUSE: " + ${env:SNOWSQL_WAREHOUSE})
$logStream.WriteLine("SF_SNOWSQL_DATABASE: " + ${env:SNOWSQL_DATABASE})
$logStream.WriteLine("SF_SNOWSQL_SCHEMA: " + ${env:SNOWSQL_SCHEMA})
$logStream.WriteLine("SF_FILE_FORMAT: " + ${env:FILE_FORMAT})
$logStream.WriteLine("fileFormatFullName: " + ${env:fileFormatFullName})
$logStream.WriteLine("SF_SEND_FILES_ZIPPED: " + ${env:SEND_FILES_ZIPPED})
$logStream.WriteLine("SF_UNLOAD_DELIMITER: " + ${env:UNLOAD_DELIM})
$logStream.WriteLine("SF_UNLOAD_ENCLOSED_BY: " + ${env:UNLOAD_ENC})
$logStream.WriteLine("SF_UNLOAD_ESCAPE_CHAR: " + ${env:UNLOAD_ESC})
$logStream.WriteLine("SF_SPLIT_THRESHOLD: " + ${env:SPLIT_THRESHOLD})
$logStream.WriteLine("SF_SPLIT_COUNT: " + ${env:FILE_COUNT})
$logStream.WriteLine("SF_TIMEZONE: " + ${env:TIMEZONE})
$logStream.WriteLine("SF_FILE_TYPE: " + ${env:FILE_TYPE})
$logStream.WriteLine("SF_EXTERNAL_STAGE: " + ${env:EXTERNAL_STAGE})
$logStream.WriteLine("SF_AZURE_SAS_TOKEN: " + ${env:AZ_SAS_TOKEN})
$logStream.WriteLine("SF_AZURE_ENCRYPTION_TYPE: " + ${env:AZ_ENCRYPTION_TYPE})
$logStream.WriteLine("SF_AZURE_ENCRYPTION_KEY: " + ${env:AZ_ENCRYPTION_KEY})
$logStream.WriteLine("SF_PURGE: " + ${env:PURGE})
$logStream.WriteLine("")
}
if (![string]::IsNullOrWhiteSpace($env:WSL_META_CONSTRING)) {
$logStream.WriteLine(" META DATA CONNECTED USING ADVANCED CONNECT ")
}
$filePath = ${env:WSL_WORKDIR}
$fileDat = "wsl${env:WSL_LOAD_TABLE}${env:WSL_SEQUENCE}"
$logStream.WriteLine("================= CLEANUP FILES ===================")
foreach($df in $((Get-ChildItem "${filePath}${fileDat}*").FullName)){
$logStream.WriteLine("Cleaning file $df")
Remove-Item -Path $df
}
$logStream.WriteLine("Cleaning ${fileDat} from SnowFlake stage")
$snowsqlDelete = "snowsql -q ""rm @~ pattern='.*$fileDat.*'"" -o friendly=false -o remove_comments=true -o timing=false"
$putRes = & $([ScriptBlock]::Create($snowsqlDelete))
$logStream.WriteLine("================= EXTRACT SQL =====================")
$extractSql = @"
SELECT vwPrsDetail.Id
, vwPrsDetail.CustomerId
, vwPrsDetail.RegistrationId
, vwPrsDetail.ReportingCustomerId
, vwPrsDetail.ContractId
, vwPrsDetail.SupplierId
, convert(varchar(19),ReceiptDate,120)
, convert(varchar(19),TransactionDate,120)
, vwPrsDetail.SalesAmount
, vwPrsDetail.RevenueAmount
, convert(varchar(19),CreateDate,120)
, convert(varchar(19),ModifiedDate,120)
, vwPrsDetail.OrganizationName
, vwPrsDetail.OrganizationId
, vwPrsDetail.UnitsSold
, vwPrsDetail.PricePerUnit
, vwPrsDetail.RebateName
, vwPrsDetail.ContractRebateDetailsId
, vwPrsDetail.TransactionClassId
, vwPrsDetail.Note
, vwPrsDetail.ChannelPartnerId
, vwPrsDetail.ChannelPartnerFeesharePartnerId
, vwPrsDetail.Level1ForceId
, vwPrsDetail.ContractExternalCategoryId
, vwPrsDetail.CustomerIdFromExternalSystem
, vwPrsDetail.CustomerLevel1IdFromExternalSystem
FROM ${env:WSL_SRC_SCHEMA}.vwPrsDetail vwPrsDetail
"@
$logStream.WriteLine($extractSql)
if (![string]::IsNullOrWhiteSpace($env:WSL_SRC_CONSTRING)) {
$logStream.WriteLine(" SOURCE DATABASE CONNECTED USING ADVANCED CONNECT ")
}
$logStream.WriteLine("")
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("BEGIN create of data file from source system: $(Get-Date)")
}
$unicode = $false
if( ${env:UNICODE_SUPPORT} -eq "TRUE") {
$unicode = $true
}
$OdbcDump = Get-OdbcDumpSource
Add-Type -TypeDefinition $OdbcDump -Language CSharp -ReferencedAssemblies "System.Data"
$wslOdbc = New-Object WhereScape.OdbcDump
#GetDataToFile(string query, string dsn, string username, string password, string dataFile, string delimiter, int fileCount, int splitThreshold, bool addQuotes, bool unicode, string enclosedBy, string escapeChar)
try {
$rowCount = $wslOdbc.GetDataToFile($extractSQL,${env:WSL_SRC_DSN},${env:WSL_SRC_USER},${env:WSL_SRC_PWD},"${filePath}${fileDat}",${env:UNLOAD_DELIM}, ${env:FILE_COUNT}, ${env:SPLIT_THRESHOLD}, $true, $unicode, ${env:UNLOAD_ENC}, ${env:UNLOAD_ESC})
}
catch {
$null = WsWrkAudit -Status "E" -Message "An error has occurred: $_.Exception.Message"
[Console]::WriteLine("-2")
[Console]::WriteLine("Load failed")
$logStream.WriteLine($_.Exception.Message)
$logStream.WriteLine($_.InvocationInfo.PositionMessage)
Print-Log
}
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("END create of data file from source system: $(Get-Date)")
}
if($rowCount -lt 1) {
$logStream.WriteLine("Source query returned 0 rows")
[Console]::WriteLine("1")
[Console]::WriteLine("Source query returned 0 rows")
Print-Log
exit
}
$dataFiles = New-Object System.Collections.Generic.List[string]
foreach($df in $((Get-ChildItem "${filePath}${fileDat}*").FullName)){
$null = $dataFiles.Add($df)
} if(($runMode -ne "S3") -and ($runmode -ne "AZ") -and ($runmode -ne "GCS") -and [string]::IsNullOrWhiteSpace(${env:EXTERNAL_STAGE})) {
$sourcePath = $filePath
for($x = 0; $x -lt $dataFiles.Count; $x++) {
$originalName = $dataFiles[$x]
if(${env:SEND_FILES_ZIPPED} -eq "TRUE") {
$dataFiles[$x] = Gzip-File -inFile $originalName -RemoveOriginal
$logStream.WriteLine("File '$originalName' GZipped to '$($dataFiles[$x])' prior to upload") }
}
}
$logStream.WriteLine("================= LOAD =================")
$sfOdbc = New-Object System.Data.Odbc.OdbcConnection
if ([string]::IsNullOrEmpty($env:WSL_TGT_CONSTRING)) {
$sfOdbc.ConnectionString = "DSN=$env:WSL_TGT_DSN"
if (! [string]::IsNullOrEmpty($env:WSL_TGT_USER)) { $sfOdbc.ConnectionString += ";UID=$env:WSL_TGT_USER" }
if (! [string]::IsNullOrEmpty($env:WSL_TGT_PWD)) { $sfOdbc.ConnectionString += ";PWD=$env:WSL_TGT_PWD" }
} else {
$sfOdbc.ConnectionString = $env:WSL_TGT_CONSTRING
}
$sfOdbc.Open()
if((($runMode -ne "S3") -and ($runmode -ne "AZ") -and ($runmode -ne "GCS")) -or ( ! [string]::IsNullOrEmpty(${env:EXTERNAL_STAGE}))) {
if(($runMode -ne "S3") -and ($runmode -ne "AZ") -and ($runmode -ne "GCS") -and [string]::IsNullOrEmpty(${env:EXTERNAL_STAGE})) {
$remSQL = "remove @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY}*;"
$null = (New-Object System.Data.Odbc.OdbcCommand($remSQL,$sfOdbc)).ExecuteNonQuery()
$fullFilePath=Join-Path -Path ${filePath} -ChildPath ${fileDat}
if(${env:SEND_FILES_ZIPPED} -eq "TRUE") {
$putSQL = "PUT 'file://$($fullFilePath.Replace("\","/"))*.gz' @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY};"
}
else {
$putSQL = "PUT 'file://$($fullFilePath.Replace("\","/"))*' @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY};"
}
$snowsqlPut = "snowsql -q ""$putsql"" -o friendly=false -o remove_comments=true -o output_format=csv -o timing=false"
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("BEGIN PUT of files matching ${filePath}${fileDat} : $(Get-Date)")
}
$logStream.WriteLine("PUT:")
$logStream.WriteLine($putSQL)
$logStream.WriteLine("")
$putRes = & $([ScriptBlock]::Create($snowsqlPut))
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("END PUT of files matching ${env:filePath}${fileDat} : $(Get-Date)")
}
}
if( ! [string]::IsNullOrEmpty(${env:TIMEZONE})) {
$tzStmt = "ALTER SESSION SET TIMEZONE = '${env:TIMEZONE}';`r`n"
$logStream.WriteLine("SET TIMEZONE:")
$logStream.WriteLine($tzStmt)
$tzCmd = New-Object System.Data.Odbc.OdbcCommand($tzStmt,$sfOdbc)
$null = $tzCmd.ExecuteNonQuery()
}
if(${env:FILE_TYPE} -eq "CSV") {
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
( Id
, CustomerId
, RegistrationId
, ReportingCustomerId
, ContractId
, SupplierId
, ReceiptDate
, TransactionDate
, SalesAmount
, RevenueAmount
, CreateDate
, ModifiedDate
, OrganizationName
, OrganizationId
, UnitsSold
, PricePerUnit
, RebateName
, ContractRebateDetailsId
, TransactionClassId
, Note
, ChannelPartnerId
, ChannelPartnerFeesharePartnerId
, ChannelPartnerLevel1ForceId
, ContractExternalCategoryId
, CustomerIdFromExternalSystem
, CustomerLevel1IdFromExternalSystem
, dss_record_source
, dss_load_date
)
FROM (
SELECT t.`$1
, t.`$2
, t.`$3
, t.`$4
, t.`$5
, t.`$6
, t.`$7
, t.`$8
, t.`$9
, t.`$10
, t.`$11
, t.`$12
, t.`$13
, t.`$14
, t.`$15
, t.`$16
, t.`$17
, t.`$18
, t.`$19
, t.`$20
, t.`$21
, t.`$22
, t.`$23
, t.`$24
, t.`$25
, t.`$26
, 'FORCE.dbo.vwPrsDetail'
, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)
FROM $(if ( [string]::IsNullOrWhiteSpace(${env:EXTERNAL_STAGE})) { "@~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY}/ t" }
else {if ( [string]::IsNullOrWhiteSpace(${fileDat}.Trim())){"@${env:EXTERNAL_STAGE}/ t"} else{ "@${env:EXTERNAL_STAGE}/${fileDat}/ t"}})
)
FILE_FORMAT = '${env:fileFormatFullName}'
PURGE = $PURGE
"@
}
else {
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM @~/WSL-${env:WSL_SEQUENCE}-${env:WSL_TASK_KEY}/
FILE_FORMAT = '${env:fileFormatFullName}'
PURGE = $PURGE
"@
}
$maskedLoad = $copyStmt
$logStream.WriteLine("COPY:")
$logStream.WriteLine($maskedLoad)
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("BEGIN COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
}
try {
$copyCmd = New-Object System.Data.Odbc.OdbcCommand($copyStmt,$sfOdbc)
$copyCmd.CommandTimeout = 0
$loadRes = New-Object Data.DataTable
$null = (New-Object Data.odbc.odbcDataAdapter($copyCmd)).fill($loadRes)
}
catch {
if(${env:DEBUG} -ne "TRUE"){
$logStream.WriteLine("================= CLEANUP FILES ===================")
$logStream.WriteLine("Cleaning ${fileDat} from SnowFlake stage")
$snowsqlDelete = "snowsql -q ""rm @~ pattern='.*$fileDat.*'"" -o friendly=false -o remove_comments=true -o timing=false"
$putRes = & $([ScriptBlock]::Create($snowsqlDelete))
foreach($df in $((Get-ChildItem "${filePath}${fileDat}*").FullName)){
$logStream.WriteLine("Cleaning file $df")
Remove-Item -Path $df
}}
$logStream.WriteLine($_.Exception.InnerException.Message)
[Console]::WriteLine("-2")
[Console]::WriteLine("Load failed")
Print-Log
exit
}
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("END COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
}
if(($loadRes.errors_seen | Measure-Object -Sum).Sum -eq 0) {
$rowsLoaded = ($loadRes.rows_loaded | Measure-Object -Sum).Sum
$logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
$logStream.Write("$rowsLoaded rows loaded.")
} else {
$logStream.Write("$rowsLoaded rows loaded using Advanced Connect.")
}
try {
$null = WsWrkTask -Inserted $rowsLoaded
} catch { $logStream.WriteLine("Failed to update task row count"); $logStream.WriteLine($_.Exception.InnerException.Message)}
if($rowCount -eq $rowsLoaded) {
[Console]::WriteLine("1")
[Console]::WriteLine("Load successful. Source rowcount $rowCount. $rowsLoaded rows loaded.")
if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
$logStream.WriteLine("Source rowcount $rowCount. $rowsLoaded rows loaded.")
} else {
$logStream.WriteLine("Source rowcount $rowCount. $rowsLoaded rows loaded using Advanced connect.")
}
}
else {
[Console]::WriteLine("-2")
[Console]::WriteLine("Source and Target rowcount does not match. Source rowcount $rowCount. Target rowcount $rowsLoaded.")
}
if(${env:DEBUG} -ne "TRUE") {
foreach($file in $dataFiles) {
Remove-Item $file
}
$null = (New-Object System.Data.Odbc.OdbcCommand($remSQL,$sfOdbc)).ExecuteNonQuery()
}
else {
$logStream.WriteLine("Temporary data files not removed as debug mode is enabled")
}
}
else {
$logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
[Console]::WriteLine("-2")
[Console]::WriteLine("Load failed")
Print-Log
exit
}
$sfOdbc.Close()
}
elseif($runmode -eq "S3") {
$sourcefilename = ''
if( ! [string]::IsNullOrEmpty(${env:TIMEZONE})) {
$tzStmt = "ALTER SESSION SET TIMEZONE = '${env:TIMEZONE}';`r`n"
$logStream.WriteLine("SET TIMEZONE:")
$logStream.WriteLine($tzStmt)
$tzCmd = New-Object System.Data.Odbc.OdbcCommand($tzStmt,$sfOdbc)
$null = $tzCmd.ExecuteNonQuery()
}
if ("false" -eq "true") {
$skipheadervalue = 1
} else {
$skipheadervalue = 0
}
$sourceFilePath="".Trim()
if ($sourceFilePath[-1] -eq "/") {
$sourceFilePath = $sourceFilePath.TrimEnd('/')
}
$sourceFileFieldDelimiter='|'
$sourceFileRecordDelimiter="".Trim()
$sourceFileFieldEnclosure='"'
if ($sourceFileFieldEnclosure.Trim() -eq ""){
$sourceFileFieldEnclosure='"'
}
if ($sourceFileRecordDelimiter.Trim() -eq "") {
$sourceFileRecordDelimiter = "\n"
}
$awsAccessKey=${env:ACCESS_KEY}
$fileDetails=$sourceFilePath.Trim()+"/"+$sourcefilename
if (($awsAccessKey -replace '\s', '') -eq "") {
$awsAccessKey='$WSL_SRCCFG_s3AccessKey$'
}
$awsSecretKey=${env:SECRET_KEY}
if (($awsSecretKey -replace '\s', '') -eq "") {
$awsSecretKey=${env:WSL_SRCCFG_s3SecretKey}
}
$s3Prefix = Get-ExtendedProperty -PropertyName "SF_S3_BUCKET_PREFIX" -TableName ${env:WSL_LOAD_TABLE}
if(($s3Prefix.StartsWith("s3://"))){
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM ${filePath}${fileDat}
CREDENTIALS = (AWS_KEY_ID='${env:ACCESS_KEY}' AWS_SECRET_KEY='${env:SECRET_KEY}')
FILE_FORMAT = '${env:fileFormatFullName}'
PURGE = $PURGE
"@
}else{
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM $fileDetails
CREDENTIALS = (AWS_KEY_ID='$awsAccessKey' AWS_SECRET_KEY='$awsSecretKey')
FILE_FORMAT = ( TYPE = csv FIELD_DELIMITER = '$sourceFileFieldDelimiter' SKIP_HEADER = $skipheadervalue RECORD_DELIMITER = '$sourceFileRecordDelimiter' FIELD_OPTIONALLY_ENCLOSED_BY = '$sourceFileFieldEnclosure' ERROR_ON_COLUMN_COUNT_MISMATCH =
"@
if ($sourcefilename.ToUpper().EndsWith('JSON')){
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM '$fileDetails'
CREDENTIALS = (AWS_KEY_ID='$awsAccessKey' AWS_SECRET_KEY='$awsSecretKey')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = true )
"@
}
if ($sourcefilename.ToUpper().EndsWith('XML')){
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM $fileDetails
CREDENTIALS = (AWS_KEY_ID='$awsAccessKey' AWS_SECRET_KEY='$awsSecretKey')
FILE_FORMAT = (TYPE = 'XML')
"@
}
}
$maskedLoad = $copyStmt
if( ! [string]::IsNullOrWhiteSpace("$awsAccessKey")) {
$maskedLoad = $maskedLoad.Replace("$awsAccessKey",(New-Object String ('*', "$awsAccessKey".Length)))
}
if( ! [string]::IsNullOrWhiteSpace($awsSecretKey)) {
$maskedLoad = $maskedLoad.Replace($awsSecretKey,(New-Object String ('*', $awsSecretKey.Length)))
}
$logStream.WriteLine("COPY:")
$logStream.WriteLine($maskedLoad)
$logStream.WriteLine("")
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("BEGIN COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
}
$copyCmd = New-Object System.Data.Odbc.OdbcCommand($copyStmt,$sfOdbc)
$copyCmd.CommandTimeout = 0
$loadRes = New-Object Data.DataTable
$null = (New-Object Data.odbc.odbcDataAdapter($copyCmd)).fill($loadRes)
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("END COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
}
if(($loadRes.errors_seen | Measure-Object -Sum).Sum -eq 0) {
$rowsLoaded = ($loadRes.rows_loaded | Measure-Object -Sum).Sum
$logStream.WriteLine(($loadRes | Where { $_.file -ne "" } | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
$logStream.Write("$rowsLoaded rows loaded.")
} else {
$logStream.Write("$rowsLoaded rows loaded using Advanced Connect.")
}
try {
$null = WsWrkTask -Inserted $rowsLoaded
} catch { $logStream.WriteLine("Failed to update task row count"); $logStream.WriteLine($_.Exception.InnerException.Message)}
[Console]::WriteLine("1")
[Console]::WriteLine("Load successful. $rowsLoaded rows loaded.")
}
else {
$logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
[Console]::WriteLine("-2")
[Console]::WriteLine("Load failed")
Print-Log
exit
}
$sfOdbc.Close()
}
elseif($runmode -eq "AZ") {
if( ! [string]::IsNullOrEmpty(${env:TIMEZONE})) {
$tzStmt = "ALTER SESSION SET TIMEZONE = '${env:TIMEZONE}';`r`n"
$logStream.WriteLine("SET TIMEZONE:")
$logStream.WriteLine($tzStmt)
$tzCmd = New-Object System.Data.Odbc.OdbcCommand($tzStmt,$sfOdbc)
$null = $tzCmd.ExecuteNonQuery()
}
if ("false" -eq "true") {
$skipheadervalue = 1
} else {
$skipheadervalue = 0
}
$azureSasToken=${env:AZ_SAS_TOKEN}
$sourceFileRecordDelimiter="".Trim()
$sourceFileFieldDelimiter='|'
$sourceFileFieldEnclosure='"'
if ($sourceFileFieldEnclosure.Trim() -eq ""){
$sourceFileFieldEnclosure='"'
}
if ($sourceFileRecordDelimiter.Trim() -eq "") {
$sourceFileRecordDelimiter = "\n"
}
if ($azureSasToken -eq $null){
$azureSasToken= ${env:WSL_SRCCFG_azureSASToken}}
if ($sourceFileRecordDelimiter.Trim() -eq "") {
$sourceFileRecordDelimiter = "\n"
}
$sourceFilePath = ${filePath}.Replace("https://","azure://").Replace("dfs.core.windows.net","blob.core.windows.net")
$fileName = $fileDat.ToUpper()
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM '$sourceFilePath$fileDat'
CREDENTIALS = (AZURE_SAS_TOKEN='$azureSasToken')
FILE_FORMAT = ( TYPE = csv ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE FIELD_DELIMITER = '$sourceFileFieldDelimiter' SKIP_HEADER = $skipheadervalue RECORD_DELIMITER = '$sourceFileRecordDelimiter' FIELD_OPTIONALLY_ENCLOSED_BY = '$sourceFileFieldEnclo
"@
if ($fileName.EndsWith('JSON')){
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM '$sourceFilePath$fileDat'
CREDENTIALS = (AZURE_SAS_TOKEN='$azureSasToken')
FILE_FORMAT = (TYPE = 'JSON' STRIP_OUTER_ARRAY = true )
"@
}
if ($fileName.EndsWith('XML')){
$copyStmt = @"
COPY INTO ${env:WSL_LOAD_FULLNAME}
FROM '$sourceFilePath$fileDat'
CREDENTIALS = (AZURE_SAS_TOKEN='$azureSasToken')
FILE_FORMAT = (TYPE = 'XML')
"@
}
$maskedLoad = $copyStmt
if( ! [string]::IsNullOrWhiteSpace($azureSasToken)) {
$maskedLoad = $maskedLoad.Replace($azureSasToken,(New-Object String ('*', $azureSasToken.Length)))
}
if( ! [string]::IsNullOrWhiteSpace(${env:AZ_ENCRYPTION_KEY})) {
$maskedLoad = $maskedLoad.Replace(${env:AZ_ENCRYPTION_KEY},(New-Object String ('*', ${env:AZ_ENCRYPTION_KEY}.Length)))
}
$logStream.WriteLine("COPY:")
$logStream.WriteLine($maskedLoad)
$logStream.WriteLine("")
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("BEGIN COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
}
$copyCmd = New-Object System.Data.Odbc.OdbcCommand($copyStmt,$sfOdbc)
$copyCmd.CommandTimeout = 0
$loadRes = New-Object Data.DataTable
$null = (New-Object Data.odbc.odbcDataAdapter($copyCmd)).fill($loadRes)
if(${env:DEBUG} -eq "TRUE") {
$logStream.WriteLine("END COPY INTO '${env:WSL_LOAD_FULLNAME}' : $(Get-Date)")
}
if(($loadRes.errors_seen | Measure-Object -Sum).Sum -eq 0) {
$rowsLoaded = ($loadRes.rows_loaded | Measure-Object -Sum).Sum
$logStream.WriteLine(($loadRes | Where { $_.file -ne "" } | Format-Table -Property file,status,rows_parsed,rows_loaded | Out-String -width 500 ))
if ([string]::IsNullOrWhiteSpace($env:WSL_TGT_CONSTRING)) {
$logStream.Write("$rowsLoaded rows loaded.")
} else {
$logStream.Write("$rowsLoaded rows loaded using Advanced Connect.")
}
try {
$null = WsWrkTask -Inserted $rowsLoaded
} catch { $logStream.WriteLine("Failed to update task row count"); $logStream.WriteLine($_.Exception.InnerException.Message)}
[Console]::WriteLine("1")
[Console]::WriteLine("Load successful.$rowsLoaded rows loaded.")
}
else {
$logStream.WriteLine(($loadRes | Format-Table -Property file,status,rows_parsed,rows_loaded -AutoSize | Out-String))
[Console]::WriteLine("-2")
[Console]::WriteLine("Load failed")
Print-Log
exit
}
$sfOdbc.Close()
}
}
try {
$fileAud = Join-Path -Path ${env:WSL_WORKDIR} -ChildPath "${env:WSL_LOAD_TABLE}_${env:WSL_SEQUENCE}.txt"
$logStream = New-Object IO.StreamWriter($FileAud,$false)
$logStream.AutoFlush = $true
Add-Type -Path $(Join-Path -Path ${env:WSL_BINDIR} -ChildPath 'WslMetadataServiceClient.dll')
$metaDbType = [WslMetadataServiceClient.MetaDatabaseType]::SqlServer
if( ! ${env:WSL_WORKDIR}.EndsWith('\')) {
${env:WSL_WORKDIR} += '\'
}
$runMode = "Database"
${env:warn} = $false
Load-Data
Print-Log
}
catch {
try {
$repo.Dispose()
}
catch {}
[Console]::WriteLine("-2")
[Console]::WriteLine("Load failed")
$logStream.WriteLine($_.Exception.Message)
$logStream.WriteLine($_.InvocationInfo.PositionMessage)
Print-Log
}
Return to Top