import Filter      from './Filter';
import Transformer from './Transformer';

class Pipeline {
  constructor(
    sampleRecords,
    dataSourceAttributes,
    joinedSampleRecords,
    joinConfig
  ) {
    this.sampleRecords        = sampleRecords;
    this.dataSourceAttributes = dataSourceAttributes;

    // We don't need to enforce data types on joined sample records,
    // because joins are only allowed on long/int/string attributes, which
    // should be delivered correctly-typed by the backend
    if (joinedSampleRecords !== undefined && joinConfig !== undefined) {
      this.applyJoin(
        joinedSampleRecords,
        joinConfig
      );
    }

    if (dataSourceAttributes !== undefined) {
      this.enforceDataTypes(dataSourceAttributes);
    }

    // store filters and transformers for efficient access
    this.filterMap = {};
    this.transformerMap = {};
    const that = this;

    Filter.getFilters().forEach(function(filter) {
      that.filterMap[filter.key] = filter;
    });

    Transformer.getTransformers().forEach(function(transformer) {
      that.transformerMap[transformer.key] = transformer;
    });
  }

  enforceDataType(value, dataType) {
    if (dataType === 'int') {
      const typedValue = parseInt(value);
      if (isNaN(typedValue)) {
        return null;
      } else {
        return typedValue;
      }
    } else if (dataType === 'long') {
      const typedValue = parseInt(value);
      if (isNaN(typedValue)) {
        return null;
      } else {
        return typedValue;
      }
    } else if (dataType === 'float') {
      const typedValue = parseFloat(value);
      if (isNaN(typedValue)) {
        return null;
      } else {
        return typedValue;
      }
    } else if (dataType === 'double') {
      const typedValue = parseFloat(value);
      if (isNaN(typedValue)) {
        return null;
      } else {
        return typedValue;
      }
    } else if (dataType === 'boolean') {
      return [true, '1', 'true', 'True', 'TRUE', 'yes', 'Yes', 'YES'].includes(value);
    } else if (dataType === 'null' || value == null) {
      return null;
    } else if (dataType === 'date') {
      let date = new Date(value);
      date.setHours(0);
      date.setMinutes(0);
      date.setSeconds(0);
      return date;
    } else if (['time-micros', 'time-millis'].includes(dataType)) {
      let time = new Date();

      const timeUnits = value.split(':');
      if (timeUnits.length > 0) {
        time.setHours(parseInt(timeUnits[0]));
      } else {
        return null;
      }

      if (timeUnits.length > 1) {
        time.setMinutes(parseInt(timeUnits[1]));
      } else {
        time.setMinutes(0);
      }

      if (timeUnits.length > 2) {
        time.setSeconds(parseInt(timeUnits[2]));
      } else {
        time.setSeconds(0);
      }

      return time;
    } else if (dataType === 'timestamp-micros') {
      return new Date(value);
    } else if (dataType === 'timestamp-millis') {
      return new Date(value);
    } else {
      // by default, parse value as string
      if (value === undefined) {
        return '';
      }
      return '' + value;
    }
  }

  // enforce data types specified in data source attributes
  enforceDataTypes(attributes) {
    const that = this;
    this.sampleRecords = this.sampleRecords.map(function(sampleRecord) {
      let values = sampleRecord.values;
      attributes.forEach(function(attribute) {
        // this catches null and undefined values
        if (values[attribute.id] == null) {
          values[attribute.id] = null;
        } else if (attribute.dataType.includes('array_')) {
          values[attribute.id] = values[attribute.id].map(value => that.enforceDataType(value, attribute.dataType));
        } else {
          values[attribute.id] = that.enforceDataType(values[attribute.id], attribute.dataType);
        }
      });

      return {
        lastChange: {},
        values:     values
      };
    });
  }

  applyJoin(joinedSampleRecords, joinConfig) {
    // TODO: check whether join config is complete
    // If not, just return sampleRecords
    const joinConfigOptions =
      [
        joinConfig.dataSourceJoinAttributeId,
        joinConfig.joinedDataSourceJoinAttributeId,
        joinConfig.joinOperator
      ];

    if (
      (joinConfigOptions.includes(undefined)) ||
      (joinConfigOptions.includes(''))
    ) {
      return;
    }

    let joinResult = this.sampleRecords.map(sampleRecord => {
      const leftValue = (sampleRecord.values[joinConfig.dataSourceJoinAttributeId] == null)
        ? null
        : sampleRecord.values[joinConfig.dataSourceJoinAttributeId] + '';

      let matchingRecord = joinedSampleRecords
        .find(joiningRecord => {
          const rightValue = (joiningRecord.values[joinConfig.joinedDataSourceJoinAttributeId] == null)
            ? null
            : joiningRecord.values[joinConfig.joinedDataSourceJoinAttributeId] + '';

            return ((rightValue != null) && (leftValue === rightValue));
        });

      // if using INNER JOIN, ignore sample records without a match
      // in the joining sample records
      if (
        (joinConfig.joinOperator === 'inner-join') &&
        (matchingRecord === undefined)
      ) {
        return null;
      }

      // only consider values of joining record
      if (matchingRecord !== undefined) {
        matchingRecord = matchingRecord.values;
      }

      return {
        lastChange: {},
        values:     Object.assign(sampleRecord.values, matchingRecord)
      };
    });

    // if using INNER JOIN, ignore sample records without a match
    // in the joining sample records
    if (joinConfig.joinOperator === 'inner-join') {
      joinResult = joinResult.filter(record => record != null);
    }

    this.sampleRecords = joinResult;
  }

  getSampleRecords() {
    return this.sampleRecords;
  }

  applyAssertions(attributes) {
    const that = this;

    // return if no assertions are defined
    const attributesWithAssertions = attributes.filter(_ => ![undefined, ''].includes(_.assertionFilter));
    if (attributesWithAssertions.length === 0) {
      return this.sampleRecords;
    }

    this.sampleRecords = this.sampleRecords.filter(function (record) {
      const unsatisfiedAssertions = attributesWithAssertions.filter(function (attribute) {
        const filter = that.filterMap[attribute.assertionFilter];
        return !filter.filterFunc(
          record.values[attribute.attributeId],
          attribute.dataType,
          that.enforceDataType(attribute.assertionValue, attribute.dataType));
      });

      return unsatisfiedAssertions.length === 0;
    });

    return this.sampleRecords;
  }

  async applySingleAttributeTransformation(
    executeTransformationFunc,
    cachedSampleRecords,
    pipelineStep,
    attribute,
    dataSourceProfileAttributes
  ) {
    const that = this;

    const attributeId = attribute.id;
    const transformationKey = attribute.transformationAction;

    const dataType = (transformationKey === 'add-column')
      ? attribute.filterValue
      : attribute.dataType;

    const isListType = dataType.includes('array_');
    const flattenedDataType = dataType.replace('array_', '');

    const filterKey = attribute.transformationFilter;

    const resetAttributeValues = () => {
      this.sampleRecords = this.sampleRecords
        .map((record, idx) => {
          record.lastChange[attributeId] = cachedSampleRecords[idx].lastChange[attributeId];
          record.values[attributeId]     = cachedSampleRecords[idx].lastChange[attributeId];

          return record;
        });
    }

    const transformer = that.transformerMap[attribute.transformationAction];
    // check whether transformer can be applied to input data
    if (transformer.restrictToDataType !== undefined &&
        !transformer.restrictToDataType.includes(flattenedDataType)) {
      // Copy all attribute values of cachedSampleRecords and reset lastChange
      resetAttributeValues();

      return Promise.resolve(this.sampleRecords);
    }

    let filter = undefined;
    if (![undefined, ''].includes(filterKey)) {
      filter = that.filterMap[filterKey];

      // check whether filter can be applied to input data
      if (
        (filter.restrictToDataType !== undefined) &&
        !filter.restrictToDataType.includes(flattenedDataType)
      ) {
        resetAttributeValues();

        return Promise.resolve(this.sampleRecords);
      }
    }

    const filterIsSatisfied = function(value) {
      let applyTransformation = true;
      // determine whether to apply transformation
      if (filter !== undefined) {
        if (filter.hasFilterValue) {
          applyTransformation = filter.filterFunc(
            value,
            flattenedDataType,
            that.enforceDataType(attribute.filterValue, flattenedDataType));
        } else {
          applyTransformation = filter.filterFunc(
            value,
            flattenedDataType);
        }
      }

      return applyTransformation;
    }

    let udfValues = undefined;
    if (
      (transformationKey === 'user-defined-transformation') &&
      !isListType
    ) {
      // values
      let cachedAttributeValues = [];

      cachedAttributeValues = cachedSampleRecords
        .map(record => {
          if (record == null) {
            return null;
          } else {
            const recordValues = {};
            dataSourceProfileAttributes.forEach(dspa => {
              recordValues[dspa.name] = record.values[dspa.id];
            });
            return recordValues;
          }
        });

      // call backend to simulate impact of UDF on cached sample records
      const response = await executeTransformationFunc(
        attribute.actionValue,
        attribute.name,
        flattenedDataType,
        cachedAttributeValues
      );

      if (response.values !== undefined) {
        udfValues = response.values;
      }
    }

    this.sampleRecords = this.sampleRecords
      .map((record, idx) => {
        const recordAtPreviousStep = cachedSampleRecords[idx];
        const attributeAtPreviousStep = recordAtPreviousStep.values[attribute.id];

        // reset lastChange
        record.lastChange[attribute.id] = recordAtPreviousStep.lastChange[attribute.id];

        const copyValueOfPreviousStep = () => {
          record.values[attribute.id] = cachedSampleRecords[idx].values[attribute.id];
        }
        const markAttributeAsChanged = () => {
          record.lastChange[attribute.id] = pipelineStep.sortPosition;
        }

        const applyTransformationToValue = function(value) {
          if (filterIsSatisfied(value)) {
            if (transformer.hasActionValue) {
              const parameter = (['cast-data-type', 'replace-attribute'].includes(transformationKey))
                ? attribute.actionValue
                : that.enforceDataType(attribute.actionValue, dataType);

              if (transformer.hasActionValue2) {
                const secondParameter = that.enforceDataType(
                  attribute.actionValue2,
                  dataType
                );

                return transformer.transformationFunc(
                  record,
                  flattenedDataType,
                  value,
                  parameter,
                  secondParameter
                );
              } else {
                return transformer.transformationFunc(
                  record,
                  flattenedDataType,
                  value,
                  parameter
                );
              }
            } else {
              return transformer.transformationFunc(
                record,
                flattenedDataType,
                value
              );
            }
          } else {
            return value;
          }
        }

        if (transformationKey === 'add-column') {
          record.values[attribute.transformAttributeId] = null;
          record.lastChange[attribute.transformAttributeId] = pipelineStep.sortPosition;

          return record;
        } else if (transformationKey === 'drop-column') {
          record.values[attribute.id] = null;
          markAttributeAsChanged();

          return record;
        } else if (transformationKey === 'rename-column') {
          // do nothing besides copying the value
          copyValueOfPreviousStep();

          return record;
        } else if (transformationKey === 'user-defined-transformation') {
          if (
            (udfValues !== undefined) && // udfValues will equal undefined if evaluation in the backend failed
            !isListType && // do not apply UDFs to lists
            filterIsSatisfied(attributeAtPreviousStep) // check whether UDF should be applied to value based on filter
          ) {
            record.values[attribute.id] = udfValues[idx];

            if (attributeAtPreviousStep !== record.values[attribute.id]) {
              markAttributeAsChanged();
            }
          } else {
            // reset values
            copyValueOfPreviousStep();
          }
        } else if (isListType) {
          if (Array.isArray(attributeAtPreviousStep) !== false) {
            record.values[attribute.id] = attributeAtPreviousStep.map(value => {
              const newValue = applyTransformationToValue(value);
              if (value !== newValue) {
                markAttributeAsChanged();
              }

              return newValue;
            });
          } else if (filterKey === 'empty') {
            record.values[attribute.id] = [applyTransformationToValue(null)];
            markAttributeAsChanged();
          }
        } else {
          record.values[attribute.id] = applyTransformationToValue(attributeAtPreviousStep);

          if (attributeAtPreviousStep !== record.values[attribute.id]) {
            markAttributeAsChanged();
          }
        }

        return record;
      });

    return Promise.resolve(this.sampleRecords);
  }

  async applyPipelineStep(
    executeTransformationFunc,
    pipelineStep,
    attributes
  ) {
    const that = this;
    const relevantAttributes = attributes
      .filter(_ => ![undefined, ''].includes(_.transformationAction));

    this.sampleRecords = this.sampleRecords
      // ignore records dropped by the previous step
      .filter(record => record != null && record.isDropped !== true);

    // determine which attributes use a UDF
    const udfAttributeValues = {};
    let udfAttributeIds = [];
    let udfValues = [];
    attributes
      .filter(attribute => attribute.transformationAction === 'user-defined-transformation')
      .forEach(attribute => {
        udfAttributeValues[attribute.id] = [];
        udfAttributeIds.push(attribute.id);

        if (![undefined, ''].includes(attribute.actionValue2)) {
          udfAttributeValues[attribute.actionValue2] = [];
        }
      });

    // calculate results of UDFs
    if (udfAttributeIds.length > 0) {
      for (let i = 0; i < this.sampleRecords.length; i++) {
        const sampleRecord = {};
        attributes.forEach(attribute => {
          sampleRecord[attribute.name] = (this.sampleRecords[i] == null)
            ? null
            : this.sampleRecords[i].values[attribute.id];
        });

        udfValues.push(sampleRecord);
      }

      for (let i = 0; i < udfAttributeIds.length; i++) {
        const udfAttributeId = udfAttributeIds[i];
        const attribute = attributes.find(attribute => attribute.id === udfAttributeId);
        const flattenedDataType = attribute.dataType.replace('array_', '');

        if (attribute != null) {
          const response = await executeTransformationFunc(
            attribute.actionValue,
            attribute.name,
            flattenedDataType,
            udfValues
          );

          if (response.values !== undefined) {
            udfAttributeValues[udfAttributeId] = response.values;
          }
        }
      }
    }

    this.sampleRecords = this.sampleRecords.map((record, idx) => {
        let newRecord = {
          lastChange: record.lastChange,
          values: {}
        };

        attributes.filter(_ => [undefined, ''].includes(_.transformationAction)).forEach(function(attribute) {
          newRecord.values[attribute.id] = record.values[attribute.id];
        });

        relevantAttributes
          .forEach((attribute) => {
            if (record == null || newRecord == null) {
              return null;
            }

            const isListType = attribute.dataType.includes('array_');
            const flattenedDataType = attribute.dataType.replace('array_', '');

            let filter = undefined;
            if (![undefined, ''].includes(attribute.transformationFilter)) {
              filter = that.filterMap[attribute.transformationFilter];
              // check whether filter can be applied to input data
              if (filter.restrictToDataType !== undefined &&
                  !filter.restrictToDataType.includes(flattenedDataType)) {
                return record;
              }
            }
            const transformer = that.transformerMap[attribute.transformationAction];
            // check whether transformer can be applied to input data
            if (transformer === undefined ||
                (transformer.restrictToDataType !== undefined &&
                 !transformer.restrictToDataType.includes(flattenedDataType))) {
              return record;
            }

            const filterIsSatisfied = function(value) {
              let applyTransformation = true;
              // determine whether to apply transformation
              if (filter !== undefined) {
                if (filter.hasFilterValue) {
                  applyTransformation = filter.filterFunc(
                    value,
                    flattenedDataType,
                    that.enforceDataType(attribute.filterValue, flattenedDataType));
                } else {
                  applyTransformation = filter.filterFunc(
                    value,
                    flattenedDataType);
                }
              }

              return applyTransformation;
            }

            const applyTransformationToValue = function(value) {
              if (filterIsSatisfied(value)) {
                if (transformer.hasActionValue) {
                  const parameter = (['cast-data-type', 'replace-attribute'].includes(attribute.transformationAction))
                    ? attribute.actionValue
                    : that.enforceDataType(attribute.actionValue, attribute.dataType);

                  if (transformer.hasActionValue2) {
                    const secondParameter = that.enforceDataType(
                      attribute.actionValue2,
                      attribute.dataType
                    );

                    return transformer.transformationFunc(
                      record,
                      flattenedDataType,
                      value,
                      parameter,
                      secondParameter
                    );
                  } else {
                    return transformer.transformationFunc(
                      record,
                      flattenedDataType,
                      value,
                      parameter
                    );
                  }
                } else {
                  return transformer.transformationFunc(
                    record,
                    flattenedDataType,
                    value
                  );
                }
              } else {
                return value;
              }
            }

            if (attribute.transformationAction === 'drop-record') {
              let applyTransformation = (filter === undefined);

              if (isListType && record.values[attribute.id] != null) {
                record.values[attribute.id].forEach(function(entry) {
                  if (filterIsSatisfied(entry)) {
                    applyTransformation = true;
                  }
                });
              } else {
                applyTransformation = filterIsSatisfied(record.values[attribute.id]);
              }

              if (applyTransformation) {
                newRecord.isDropped = true;
                newRecord.values = {};
              } else {
                newRecord.values[attribute.id] = record.values[attribute.id];
              }
            } else if (attribute.transformationAction === 'add-column') {
              newRecord.values[attribute.id] = null;
              newRecord.lastChange[attribute.id] = pipelineStep.sortPosition;
            } else if (attribute.transformationAction === 'drop-column') {
              newRecord.lastChange[attribute.id] = pipelineStep.sortPosition;
            } else if (attribute.transformationAction === 'rename-column') {
              // do nothing besides copying the value
              newRecord.values[attribute.id] = record.values[attribute.id];
            } else if (attribute.transformationAction === 'user-defined-transformation') {
              const udfValue = udfAttributeValues[attribute.id][idx];
              if (udfValue !== undefined && // udfValues will equal undefined if evaluation in the backend failed
                  !isListType && // do not apply UDFs to lists
                  filterIsSatisfied(record.values[attribute.id]) // check whether UDF should be applied to value based on filter
              ) {
                newRecord.values[attribute.id] = udfValue;

                if (record.values !== newRecord.values[attribute.id]) {
                  newRecord.lastChange[attribute.id] = pipelineStep.sortPosition;
                }
              } else {
                newRecord.values[attribute.id] = record.values[attribute.id];
              }
            } else if (isListType) {
              if (Array.isArray(record.values[attribute.id]) !== false) {
                newRecord.values[attribute.id] = record.values[attribute.id].map(value => {
                  const newValue = applyTransformationToValue(value);
                  if (value !== newValue) {
                    newRecord.lastChange[attribute.id] = pipelineStep.sortPosition;
                  }

                  return newValue;
                });
              } else if (attribute.transformationFilter === 'empty') {
                newRecord.values[attribute.id] = [applyTransformationToValue(null)];
                newRecord.lastChange[attribute.id] = pipelineStep.sortPosition;
              }
            } else {
              newRecord.values[attribute.id] =
                applyTransformationToValue(record.values[attribute.id]);

              if (record.values[attribute.id] !== newRecord.values[attribute.id]) {
                newRecord.lastChange[attribute.id] = pipelineStep.sortPosition;
              }
            }
          });

      return newRecord;
    });

    return Promise.resolve(this.sampleRecords);
  }
}

export default Pipeline;
