const rfdc = require('rfdc')()

function preparePipeline(rawPipeline) {
  let pipeline = rfdc(rawPipeline);

  const pipelineSteps = pipeline.pipelineSteps;

  // group by sortPosition
  let groupedSteps = {};
  pipelineSteps.forEach(function(step) {
    if (groupedSteps[step.sortPosition] === undefined) {
      groupedSteps[step.sortPosition] = [];
    }
    groupedSteps[step.sortPosition].push(step);
  });

  let transformedSteps = [];
  for (let position in groupedSteps) {
    const step = groupedSteps[position];
    if (step[0].transformationType === 'join_pipeline') {
      transformedSteps.push(step[0]);
    } else if (step[0].transformationType === 'transform') {
      // copy basic infos
      let newStep = {
        name:               step[0].name,
        transformationType: step[0].transformationType,
        sortPosition:       step[0].sortPosition,
        attributes:         []
      };
      // copy attributes
      newStep.attributes = step.map(function(attribute) {
        return {
          transformAttributeId: attribute.transformAttributeId,
          transformationFilter: attribute.transformationFilter,
          filterValue:          attribute.filterValue,
          transformationAction: attribute.transformationAction,
          actionValue:          attribute.actionValue,
          actionValue2:         attribute.actionValue2
        };
      });
      transformedSteps.push(newStep);
    }
  }

  pipeline.pipelineSteps = transformedSteps;

  return pipeline;
}

export function fetchPipelines() {
  const requestPipelines = () => ({
    type: 'REQUEST_PIPELINES'
  });

  const receivedPipelines = response => ({
    type:      'RECEIVE_PIPELINES',
    pipelines: response
  });

  return function (dispatch) {
    dispatch(requestPipelines());

    return fetch(
        '/api/pipelines',
        {
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((json) => {
        dispatch(receivedPipelines(json));
      });
  };
}

export function fetchActivePipelines() {
  const requestActivePipelines = () => ({
    type: 'REQUEST_ACTIVE_PIPELINES'
  });

  const receivedActivePipelines = response => ({
    type:      'RECEIVE_ACTIVE_PIPELINES',
    pipelines: response
  });

  return function (dispatch) {
    dispatch(requestActivePipelines());

    return fetch('/api/pipelines/active',
                 { headers: { 'X-Auth-token': localStorage.getItem('userToken') }})
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((json) => {
        dispatch(receivedActivePipelines(json));
      });
  };
}

export function fetchPipeline(id) {
  const requestPipeline = () => ({
    type: 'REQUEST_PIPELINE'
  });

  const receivedPipeline = response => ({
    type:       'RECEIVE_PIPELINE',
    pipeline: response
  });

  return function (dispatch) {
    dispatch(requestPipeline());

    return fetch('/api/pipelines/' + id,
                 { headers: { 'X-Auth-token': localStorage.getItem('userToken') }})
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            window.location = '/404';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((pipeline) => {
        dispatch(receivedPipeline(preparePipeline(pipeline)));
      });
  };
}

export function fetchPipelineExecutableLogs(pipelineId, executableId) {
  const requestPipelineLogs = () => ({
    type: 'REQUEST_PIPELINE_EXECUTABLE_LOGS'
  });

  const receivedPipelineLogs = response => ({
    type:         'RECEIVE_PIPELINE_EXECUTABLE_LOGS',
    pipelineLogs: response
  });

  const receivedPipelineLogsFailed = () => ({
    type: 'RECEIVE_PIPELINE_EXECUTABLE_LOGS_FAILED'
  });

  return function (dispatch) {
    dispatch(requestPipelineLogs());

    return fetch(
      '/api/pipelines/' + pipelineId + '/executables/' + executableId + '/logs',
      { headers: { 'X-Auth-token': localStorage.getItem('userToken') }}
    ).then(
      response => {
        if (response.status === 401) {
          localStorage.removeItem('userToken');
          window.location = '/sign_in';
        } else if (response.status >= 400) {
          return response.text().then(text => dispatch(receivedPipelineLogsFailed()));
        } else {
          return response.json().then(json => dispatch(receivedPipelineLogs(json)));
        }
      },
      error => console.log('An error occurred.', error)
    );
  };
}

export function addPipeline(pipeline) {
  const requestAddPipeline = () => ({
    type: 'REQUEST_ADD_PIPELINE'
  });

  const receivedAddPipeline = response => ({
    pipeline: response,
    type:     'RECEIVE_ADD_PIPELINE'
  });

  const receivedAddPipelineFailed = response => ({
    errorMessage: response,
    type:         'RECEIVE_ADD_PIPELINE_FAILED'
  });

  return function (dispatch) {
    dispatch(requestAddPipeline());

    return fetch(
        '/api/pipelines',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'Content-type': 'application/json',
            'X-Auth-token': localStorage.getItem('userToken')
          },
          body: JSON.stringify(pipeline),
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status >= 300) {
            return response.text().then(errorMessage => dispatch(receivedAddPipelineFailed(errorMessage)));
          } else {
            return response.json().then(json => dispatch(receivedAddPipeline(json)));
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}

export function updatePipeline(pipeline, updatedAt) {
  const requestUpdatePipeline = () => ({
    type: 'REQUEST_UPDATE_PIPELINE'
  });

  const receivedUpdatePipeline = (pipeline) => ({
    pipeline: pipeline,
    type:     'RECEIVE_UPDATE_PIPELINE'
  });

  const receivedUpdatePipelineConflict = () => ({
    type: 'RECEIVE_UPDATE_PIPELINE_CONFLICT'
  });

  return function (dispatch) {
    dispatch(requestUpdatePipeline());

    ['createdAt'].forEach(function(attribute) {
      delete pipeline[attribute];
    });

    if (updatedAt !== undefined) {
      pipeline['updatedAt'] = updatedAt;
    }

    return fetch(
        '/api/pipelines/' + pipeline.id,
        {
          method:      'PUT',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'Content-type': 'application/json',
            'X-Auth-token': localStorage.getItem('userToken')
          },
          body: JSON.stringify(pipeline),
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 409) {
            dispatch(receivedUpdatePipelineConflict());
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      ).then((pipeline) => {
        if (pipeline !== undefined) {
          dispatch(receivedUpdatePipeline(pipeline));

          return pipeline.updatedAt;
        }
      });
  };
}

export function updateDataSinkOfPipeline(pipeline, dataSinkId) {
  const requestUpdatePipeline = () => ({
    type: 'REQUEST_UPDATE_DATA_SINK_PIPELINE'
  });

  const receivedUpdatePipeline = (response) => ({
    pipeline: response,
    type:     'RECEIVE_UPDATE_DATA_SINK_PIPELINE'
  });

  return function (dispatch) {
    dispatch(requestUpdatePipeline());

    return fetch(
        '/api/pipelines/' + pipeline.id + '/data_sink',
        {
          method:      'PUT',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'Content-type': 'application/json',
            'X-Auth-token': localStorage.getItem('userToken')
          },
          body: JSON.stringify({
            dataSinkId: dataSinkId
          }),
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      ).then((pipeline) => {
        dispatch(receivedUpdatePipeline(preparePipeline(pipeline)));
      });
  };
}

export function updateJoinedDataSourceOfPipeline(pipeline, dataSourceId) {
  const requestUpdatePipeline = () => ({
    type: 'REQUEST_UPDATE_JOINED_DATA_SOURCE_PIPELINE'
  });

  const receivedUpdatePipeline = (response) => ({
    pipeline: response,
    type:     'RECEIVE_UPDATE_JOINED_DATA_SOURCE_PIPELINE'
  });

  const receivedUpdatePipelineFailure = (response) => ({
    error: response,
    type:  'RECEIVE_UPDATE_JOINED_DATA_SOURCE_PIPELINE_FAILURE'
  });

  return function (dispatch) {
    dispatch(requestUpdatePipeline());

    return fetch(
        '/api/pipelines/' + pipeline.id + '/joined_data_source',
        {
          method:      'PUT',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'Content-type': 'application/json',
            'X-Auth-token': localStorage.getItem('userToken')
          },
          body: JSON.stringify({
            dataSourceId: dataSourceId
          }),
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status >= 400) {
            return response
              .text()
              .then(error => dispatch(receivedUpdatePipelineFailure(error)));
          } else {
            return response
              .json()
              .then(pipeline => dispatch(receivedUpdatePipeline(preparePipeline(pipeline))));
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}

export function deletePipeline(id) {
  const requestDeletePipeline = () => ({
    type: 'REQUEST_DELETE_PIPELINE'
  });
  const receivedDeletePipeline = () => ({
    type: 'RECEIVE_DELETE_PIPELINE'
  });

  return function (dispatch) {
    dispatch(requestDeletePipeline());

    return fetch(
        '/api/pipelines/' + id,
        {
          method:      'DELETE',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.text();
          }
        },
        error => console.log('An error occurred.', error)
      ).then(() => {
        dispatch(receivedDeletePipeline());
      });
  };
}

export function resetPipeline(id) {
  const requestResetPipeline = () => ({
    type: 'REQUEST_RESET_PIPELINE'
  });
  const receivedResetPipeline = () => ({
    type: 'RECEIVE_RESET_PIPELINE'
  });

  return function (dispatch) {
    dispatch(requestResetPipeline());

    return fetch(
        '/api/pipelines/' + id + '/reset',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.text();
          }
        },
        error => console.log('An error occurred.', error)
      ).then(() => {
        dispatch(receivedResetPipeline());
      });
  };
}

export function duplicatePipeline(id) {
  const requestDuplicatePipeline = () => ({
    type: 'REQUEST_DUPLICATE_PIPELINE'
  });
  const receivedDuplicatePipeline = (pipeline) => ({
    pipeline: pipeline,
    type:     'RECEIVE_DUPLICATE_PIPELINE'
  });

  return function (dispatch) {
    dispatch(requestDuplicatePipeline());

    return fetch(
        '/api/pipelines/' + id + '/duplicate',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      ).then((pipeline) => {
        dispatch(receivedDuplicatePipeline(pipeline));
      });
  };
}

export function fetchExecutables(id) {
  const requestExecutables = () => ({
    type: 'REQUEST_PIPELINE_EXECUTABLES'
  });

  const receivedExecutables = response => ({
    type:                'RECEIVE_PIPELINE_EXECUTABLES',
    pipelineExecutables: response
  });

  return function (dispatch) {
    dispatch(requestExecutables());

    return fetch('/api/pipelines/' + id + '/executables',
                 { headers: { 'X-Auth-token': localStorage.getItem('userToken') }})
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((json) => {
        dispatch(receivedExecutables(json));
      });
  };
}

export function updateExecutableName(pipelineId, id, name) {
  return function (dispatch) {
    return fetch(
        '/api/pipelines/' + pipelineId + '/executables/' + id + '/name',
        {
          method:      'PUT',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'Content-type': 'application/json',
            'X-Auth-token': localStorage.getItem('userToken')
          },
          body: JSON.stringify({
            name: name
          })
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}

export function compileExecutable(id) {
  const requestCompileExecutable = () => ({
    type: 'REQUEST_COMPILE_PIPELINE_EXECUTABLE'
  });

  const receivedCompileExecutable = response => ({
    type:               'RECEIVE_COMPILE_PIPELINE_EXECUTABLE',
    pipelineExecutable: response
  });

  return function (dispatch) {
    dispatch(requestCompileExecutable());

    return fetch(
      '/api/pipelines/' + id + '/compile',
      {
        method:      'POST',
        credentials: 'same-origin',
        mode:        'cors',
        cache:       'no-cache',
        headers: {
          'X-Auth-token': localStorage.getItem('userToken')
        }
      })
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((json) => {
        dispatch(receivedCompileExecutable(json));
      });
  };
}

export function startExecutable(pipelineId, id) {
  const requestStartExecutable = () => ({
    type: 'REQUEST_START_PIPELINE_EXECUTABLE'
  });

  const receivedStartExecutable = response => ({
    type:               'RECEIVE_START_PIPELINE_EXECUTABLE',
    pipelineExecutable: response
  });

  const failedStartExecutable = () => ({
    type: 'FAILED_START_PIPELINE_EXECUTABLE',
  });

  return function (dispatch) {
    dispatch(requestStartExecutable());

    return fetch(
      '/api/pipelines/' + pipelineId + '/executables/' + id + '/start',
      {
        method:      'POST',
        credentials: 'same-origin',
        mode:        'cors',
        cache:       'no-cache',
        headers: {
          'X-Auth-token': localStorage.getItem('userToken')
        }
      })
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status >= 400) {
            dispatch(failedStartExecutable());
          } else {
            return response
              .json()
              .then((json) => dispatch(receivedStartExecutable(json)));
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}

export function deleteExecutable(pipelineId, id) {
  const requestDeleteExecutable = () => ({
    type: 'REQUEST_DELETE_PIPELINE_EXECUTABLE'
  });

  const receivedDeleteExecutable = () => ({
    type:                 'RECEIVE_DELETE_PIPELINE_EXECUTABLE',
    pipelineExecutableId: id
  });

  return function (dispatch) {
    dispatch(requestDeleteExecutable());

    return fetch(
      '/api/pipelines/' + pipelineId + '/executables/' + id,
      {
        method:      'DELETE',
        credentials: 'same-origin',
        mode:        'cors',
        cache:       'no-cache',
        headers: {
          'X-Auth-token': localStorage.getItem('userToken')
        }
      })
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.text();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then(() => {
        dispatch(receivedDeleteExecutable());
      });
  };
}

export function haltExecutable(pipelineId, id) {
  const requestHaltExecutable = () => ({
    type: 'REQUEST_HALT_PIPELINE_EXECUTABLE'
  });

  const receivedHaltExecutable = response => ({
    type:               'RECEIVE_HALT_PIPELINE_EXECUTABLE',
    pipelineExecutable: response
  });

  const failedHaltExecutable = () => ({
    type: 'FAILED_HALT_PIPELINE_EXECUTABLE',
  });

  return function (dispatch) {
    dispatch(requestHaltExecutable());

    return fetch(
      '/api/pipelines/' + pipelineId + '/executables/' + id + '/halt',
      {
        method:      'POST',
        credentials: 'same-origin',
        mode:        'cors',
        cache:       'no-cache',
        headers: {
          'X-Auth-token': localStorage.getItem('userToken')
        }
      })
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status >= 400) {
            dispatch(failedHaltExecutable());
          } else {
            return response
              .json()
              .then((json) => dispatch(receivedHaltExecutable(json)));
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}

export function exportPipeline(pipeline, dataSink) {
  const requestExportPipeline = () => ({
    type: 'REQUEST_EXPORT_PIPELINE'
  });

  const receiveExportPipeline = () => ({
    type: 'RECEIVE_EXPORT_PIPELINE'
  });

  return function (dispatch) {
    dispatch(requestExportPipeline());

    return fetch('/api/pipelines/' + pipeline.id + '/export',
                 { headers: { 'X-Auth-token': localStorage.getItem('userToken') }})
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.blob();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((blob) => {
        const addLeadingZeroUnlessPresent = function(number) {
          return ('0' + number).slice(-2);
        }
        const currentDate = new Date(Date.now());
        const dateString = [
          currentDate.getFullYear(),
          addLeadingZeroUnlessPresent(currentDate.getMonth()),
          addLeadingZeroUnlessPresent(currentDate.getDate()),
          addLeadingZeroUnlessPresent(currentDate.getHours()),
          addLeadingZeroUnlessPresent(currentDate.getMinutes()),
          addLeadingZeroUnlessPresent(currentDate.getSeconds())].join('');
        const url = window.URL.createObjectURL(blob);
        const a = document.createElement('a');
        a.href = url;
        a.download = [
          'pipeline-',
          pipeline.id,
          '-',
          dateString,
          '.',
          dataSink.sinkType].join('');
        a.click();
        dispatch(receiveExportPipeline());
      });
  };
}

export function fetchSourceConnectorOfPipeline(id) {
  const requestPipelineSourceConnector = () => ({
    type: 'REQUEST_PIPELINE_SOURCE_CONNECTOR'
  });

  const receivedPipelineSourceConnector = response => ({
    sourceConnector: response,
    type:            'RECEIVE_PIPELINE_SOURCE_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestPipelineSourceConnector());

    return fetch(
        '/api/pipelines/' + id + '/source_connector',
        { headers: { 'X-Auth-token': localStorage.getItem('userToken') }}
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((sourceConnector) => {
        dispatch(receivedPipelineSourceConnector(sourceConnector));
      });
  };
}

export function fetchJoinedSourceConnectorOfPipeline(id) {
  const requestPipelineJoinedSourceConnector = () => ({
    type: 'REQUEST_PIPELINE_JOINED_SOURCE_CONNECTOR'
  });

  const receivedPipelineJoinedSourceConnector = response => ({
    joinedSourceConnector: response,
    type:                  'RECEIVE_PIPELINE_JOINED_SOURCE_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestPipelineJoinedSourceConnector());

    return fetch(
        '/api/pipelines/' + id + '/joined_source_connector',
        { headers: { 'X-Auth-token': localStorage.getItem('userToken') }}
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((joinedSourceConnector) => {
        dispatch(receivedPipelineJoinedSourceConnector(joinedSourceConnector));
      });
  };
}

export function fetchSinkConnectorOfPipeline(id) {
  const requestPipelineSinkConnector = () => ({
    type: 'REQUEST_PIPELINE_SINK_CONNECTOR'
  });

  const receivedPipelineSinkConnector = response => ({
    sinkConnector: response,
    type:          'RECEIVE_PIPELINE_SINK_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestPipelineSinkConnector());

    return fetch(
        '/api/pipelines/' + id + '/sink_connector',
        { headers: { 'X-Auth-token': localStorage.getItem('userToken') }}
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((sinkConnector) => {
        dispatch(receivedPipelineSinkConnector(sinkConnector));
      });
  };
}

export function recreateSourceConnectorOfPipeline(id) {
  const requestRecreatePipelineSourceConnector = () => ({
    type: 'REQUEST_RECREATE_PIPELINE_SOURCE_CONNECTOR'
  });

  const receivedRecreatePipelineSourceConnector = response => ({
    sourceConnector: response,
    type:            'RECEIVE_RECREATE_PIPELINE_SOURCE_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestRecreatePipelineSourceConnector());

    return fetch(
        '/api/pipelines/' + id + '/source_connector/recreate',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((sourceConnector) => {
        dispatch(receivedRecreatePipelineSourceConnector(sourceConnector));
      });
  };
}

export function restartSourceConnectorOfPipeline(id) {
  const requestRestartPipelineSourceConnector = () => ({
    type: 'REQUEST_RESTART_PIPELINE_SOURCE_CONNECTOR'
  });

  const receivedRestartPipelineSourceConnector = response => ({
    sourceConnector: response,
    type:            'RECEIVE_RESTART_PIPELINE_SOURCE_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestRestartPipelineSourceConnector());

    return fetch(
        '/api/pipelines/' + id + '/source_connector/restart',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((sourceConnector) => {
        dispatch(receivedRestartPipelineSourceConnector(sourceConnector));
      });
  };
}

export function restartJoinedSourceConnectorOfPipeline(id) {
  const requestRestartPipelineJoinedSourceConnector = () => ({
    type: 'REQUEST_RESTART_PIPELINE_JOINED_SOURCE_CONNECTOR'
  });

  const receivedRestartPipelineJoinedSourceConnector = response => ({
    joinedSourceConnector: response,
    type:                  'RECEIVE_RESTART_PIPELINE_JOINED_SOURCE_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestRestartPipelineJoinedSourceConnector());

    return fetch(
        '/api/pipelines/' + id + '/joined_source_connector/restart',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((joinedSourceConnector) => {
        dispatch(receivedRestartPipelineJoinedSourceConnector(joinedSourceConnector));
      });
  };
}

export function restartSinkConnectorOfPipeline(id) {
  const requestRestartPipelineSinkConnector = () => ({
    type: 'REQUEST_RESTART_PIPELINE_SINK_CONNECTOR'
  });

  const receivedRestartPipelineSinkConnector = response => ({
    sinkConnector: response,
    type:          'RECEIVE_RESTART_PIPELINE_SINK_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestRestartPipelineSinkConnector());

    return fetch(
        '/api/pipelines/' + id + '/sink_connector/restart',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((sinkConnector) => {
        dispatch(receivedRestartPipelineSinkConnector(sinkConnector));
      });
  };
}

export function fetchFlatFilesOfPipeline(id) {
  const requestFlatFilesOfPipeline = () => ({
    type: 'REQUEST_PIPELINE_FLAT_FILES'
  });

  const receivedFlatFilesOfPipeline = response => ({
    flatFiles: response,
    type:      'RECEIVE_PIPELINE_FLAT_FILES'
  });

  return function (dispatch) {
    dispatch(requestFlatFilesOfPipeline());

    return fetch('/api/pipelines/' + id + '/flat_files',
                 { headers: { 'X-Auth-token': localStorage.getItem('userToken') }})
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((json) => {
        dispatch(receivedFlatFilesOfPipeline(json));
      });
  };
}

export function fetchMetricsOfPipeline(id) {
  const requestMetricsOfPipeline = () => ({
    type: 'REQUEST_PIPELINE_METRICS'
  });

  const receivedMetricsOfPipeline = response => ({
    metrics: response,
    type:    'RECEIVE_PIPELINE_METRICS'
  });

  return function (dispatch) {
    dispatch(requestMetricsOfPipeline());

    return fetch('/api/pipelines/' + id + '/metrics',
                 { headers: { 'X-Auth-token': localStorage.getItem('userToken') }})
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else {
            return response.json();
          }
        },
        error => console.log('An error occurred.', error)
      )
      .then((json) => {
        dispatch(receivedMetricsOfPipeline(json));
      });
  };
}

export function fetchYamlOfPipeline(id) {

    const requestYamlOfPipeline = () => ({
        type: 'REQUEST_PIPELINE_YAML'
    });

    const receivedYamlOfPipeline = response => ({
        yaml: response,
        type:    'RECEIVE_PIPELINE_YAML'
    });

    const receivedYamlOfPipelineFailed = response => ({
        yaml: "",
        type: 'RECEIVE_PIPELINE_YAML_ERROR'
    })

    return function (dispatch) {
        dispatch(requestYamlOfPipeline());

        return fetch('/api/pipelines/' + id,
            { headers: { 'X-Auth-token': localStorage.getItem('userToken'), 'Accept': 'application/x-yaml' }})
            .then(
                response => {
                    if (response.status === 401) {
                        localStorage.removeItem('userToken');
                        window.location = '/sign_in';
                    } else if (response.status >= 500) {
                        return response.text().then(errorMessage => dispatch(receivedYamlOfPipelineFailed(errorMessage)));
                    } else {
                        return response.text().then(body => dispatch(receivedYamlOfPipeline(body)));
                    }
                },
                error => console.log('An error occurred.', error)
            );
};
}


export function transferPipelineToProject(pipelineId, projectId) {
  const requestTransferPipeline = () => ({
    type: 'REQUEST_TRANSFER_PIPELINE_TO_PROJECT'
  });

  const receivedTransferPipeline = (pipeline) => ({
    pipeline: pipeline,
    type:     'RECEIVE_TRANSFER_PIPELINE_TO_PROJECT'
  });

  const receivedTransferPipelineFailed = (errorMessage) => ({
    errorMessage: errorMessage,
    type:         'RECEIVE_TRANSFER_PIPELINE_TO_PROJECT_FAILED'
  });

  return function (dispatch) {
    dispatch(requestTransferPipeline());

    return fetch(
        `/api/pipelines/${pipelineId}/transfer_to_project?projectId=${projectId}`,
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      ).then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status >= 300) {
            return response.text().then(errorMessage => dispatch(receivedTransferPipelineFailed(errorMessage)));
          } else {
            return response.json().then(pipeline => dispatch(receivedTransferPipeline(preparePipeline(pipeline))));
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}

export function resetOffsetOfPipeline(id) {
  const requestResetOffsetPipeline = () => ({
    type: 'REQUEST_RESET_OFFSET_PIPELINE'
  });

  const receivedResetOffsetPipeline = (metrics) => ({
    metrics: metrics,
    type:    'RECEIVE_RESET_OFFSET_PIPELINE'
  });

  return function (dispatch) {
    dispatch(requestResetOffsetPipeline());

    return fetch(
        '/api/pipelines/' + id + '/reset_offset',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json().then(metrics => dispatch(receivedResetOffsetPipeline(metrics)));
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}
export function resetOffsetOfSinkConnectorOfPipeline(id) {
  const requestResetOffsetPipelineSinkConnector = () => ({
    type: 'REQUEST_RESET_OFFSET_PIPELINE_SINK_CONNECTOR'
  });

  const receivedResetOffsetPipelineSinkConnector = (metrics) => ({
    metrics: metrics,
    type:    'RECEIVE_RESET_OFFSET_PIPELINE_SINK_CONNECTOR'
  });

  return function (dispatch) {
    dispatch(requestResetOffsetPipelineSinkConnector());

    return fetch(
        '/api/pipelines/' + id + '/sink_connector/reset_offset',
        {
          method:      'POST',
          credentials: 'same-origin',
          mode:        'cors',
          cache:       'no-cache',
          headers: {
            'X-Auth-token': localStorage.getItem('userToken')
          }
        }
      )
      .then(
        response => {
          if (response.status === 401) {
            localStorage.removeItem('userToken');
            window.location = '/sign_in';
          } else if (response.status === 404) {
            return Promise.resolve(undefined);
          } else {
            return response.json().then(metrics => dispatch(receivedResetOffsetPipelineSinkConnector(metrics)));
          }
        },
        error => console.log('An error occurred.', error)
      );
  };
}

export function resetErrorMessage() {
  return function (dispatch) {
    dispatch({
      type: 'RESET_PIPELINE_ERROR_MESSAGE'
    });
  }
}
