BI & AnalyticsBlog

Complete Guide: How to Write Custom Step Plug-in on Pentaho?

Click To Tweet
Reading Time: 9 minutes No Comments
Pentaho Data Integration – Writing Custom Step Plug-InExtension Point (Tutorial) Reading Time: 8 minutes

Why do we need Custom Step Plug-In?

Are you stuck with PDI [Kettle] to implement your own functionality? Are you planning to have out of box solution in your PDI [Kettle]? Pentaho integration can be tricky if you are thinking of implementing your own functionality or out of box solution with your own Pentaho custom plug-in.Pentaho Data IntegrationCustom Plug-ins/Extension Points are a great way to implement own functionality or out of box solutions. Pentaho also supports or provides the flexible/pluggable architecture to write your own plug-in without having too much of complexity.

There is one more option called ‘User Defined Java class’ where you can implement the solution. But, this will not be supported as a generic solution where you will have to adjust inputs to this step in all the transformations.

For instance, If you want to build the solution which is out of box solution as below

  • Read/Consume Messages from Kafka and Write/Produce Messages to Kafka based on the Topic and partition numbers in a multithreaded fashion having Kafka input parameters via dialogue box in the plug-in where Custom plug-in step will allow you to enter input parameters etc.

Custom Step Plug-In/Extension Point Development

At minimum you need 4 classes implementing the following interfaces (package be.ibridge.kettle.trans.step):

  • StepMetaInterface:  it defines the metadata and takes care of XML representation, saving loading from/to repository, checks, etc.
  • StepInterface:  makes the step execute: inherit from BaseStep to make your life easier.
  • StepDataInterface: holds open cursors, resultsets, files, etc.
  • StepDialogInterface: GUI/dialog code to edit the meta-data

Detail Development steps – How to implement Pentaho Custom Plug-In

This article assumes that the reader is aware of Java, PDI, SWT and Eclipse.

This article does not provide you any specific functionality however, it does provide the information about how to create a Custom Plug-In with a Sample.

Prerequisites:  Java, PDI 6.1 and Eclipse.

Let’s build a sample plugin step by step which will add new String field to input Stream and pass on to the next step.  Maven can be used to build this code. But, to understand it better we skip Maven as of now.

Sample plug-in

Step 1 :

Create new Java Project ‘Pentaho-Sample-Plug-In’ in eclipse
Pentaho-Sample-Plug-In

Click ‘Finish’ and you will see the below project in the package explorer

Step 2 :

Create Package ‘com.sigma.sample’

Workspace Java Eclipse

Click ‘Finish’ and you will see the below new package in the package explorer

package explorer
Step 3 :

Configure Build Path

  • Click ‘Add External JARs…’
  • Select all jars from …\data-integration\lib and …\data-integration\libswt\win64
  • Click ‘OK’

 

Step 4 :

Let’s start implementing the plug-in.

Create class ‘SamplePlugInData ‘which extends BaseStepData and implements StepDataInterface as below:

package com.sigma.sample;

import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.step.BaseStepData;
import org.pentaho.di.trans.step.StepDataInterface;

public class SamplePlugInData extends BaseStepData implements StepDataInterface {

public RowMetaInterface outputRowMeta;

public SamplePlugInData() {
super();
}

}

 

Step 5 :

 

Create class ‘SamplePlugInMeta ‘which extends BaseStepMeta and implements StepMetaInterface as below

/**
*
*/
package com.sigma.sample;

import java.util.List;
import java.util.Map;

import org.pentaho.di.core.CheckResult;
import org.pentaho.di.core.CheckResultInterface;
import org.pentaho.di.core.Counter;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMeta;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStepMeta;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.w3c.dom.Node;

/**
* @author basavaraja.pa
*
*/

public class SamplePlugInMeta extends BaseStepMeta implements StepMetaInterface {

private String newField;

public SamplePlugInMeta() {
super(); // allocate BaseStepInfo
}

public String getNewField() {
return newField;
}

public void setNewField(String newField) {
this.newField = newField;
}

@Override
public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta,
Trans trans) {
return new SamplePlugInStep(stepMeta, stepDataInterface, cnr, transMeta, trans);
}

@Override
public StepDataInterface getStepData() {
return new SamplePlugInData();
}

@Override
public void setDefault() {
// TODO Auto-generated method stub
}

public void loadXML(Node stepnode, List<DatabaseMeta> databases, Map<String, Counter> counters)
throws KettleXMLException {

try {
newField = XMLHandler.getTagValue(stepnode,”NEWFIELD”);
} catch (Exception e) {
throw new KettleXMLException(“Load XML: Excption “, e);// Messages.getString(“KafkaTopicPartitionConsumerMeta.Exception.loadXml”),
// e);
}
}

public String getXML() throws KettleException {
StringBuilder retVal = new StringBuilder();
if (newField != null) {
retVal.append(”    “).append(XMLHandler.addTagValue(“NEWFIELD”, newField));
}
return retVal.toString();
}

public void readRep(Repository rep, ObjectId stepId, List<DatabaseMeta> databases, Map<String, Counter> counters)
throws KettleException {
try {
newField = rep.getStepAttributeString(stepId, “NEWFIELD”);
} catch (Exception e) {
throw new KettleException(“Unexpected error reading step Sample Plug-In from the repository”, e);
}
}

public void saveRep(Repository rep, ObjectId transformationId, ObjectId stepId) throws KettleException {
try {
if (newField != null) {
rep.saveStepAttribute(transformationId, stepId, “NEWFIELD”, newField);
}
} catch (Exception e) {
throw new KettleException(“Unexpected error saving step Sample Plug-In from the repository”, e);
}
}

public void getFields(RowMetaInterface rowMeta, String origin, RowMetaInterface[] info, StepMeta nextStep,
VariableSpace space) throws KettleStepException {

ValueMetaInterface newFieldMeta = new ValueMeta(getNewField(), ValueMetaInterface.TYPE_STRING);
newFieldMeta.setName(“NewField”);
newFieldMeta.setOrigin(origin);
rowMeta.addValueMeta(newFieldMeta);
}

public void check(List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev,
String input[], String output[], RowMetaInterface info) {
CheckResult cr;
if (prev == null || prev.size() == 0) {
cr = new CheckResult(CheckResult.TYPE_RESULT_WARNING, “Not receiving any fields from previous steps!”,
stepMeta);
remarks.add(cr);
}
}
}

Step 6 :

 

Create class ‘SamplePlugInDialog ‘which extends BaseStepDialog and implements StepDialogInterface as below

/**
*
*/
package com.sigma.sample;

import org.eclipse.swt.SWT;
import org.eclipse.swt.events.ModifyEvent;
import org.eclipse.swt.events.ModifyListener;
import org.eclipse.swt.events.SelectionAdapter;
import org.eclipse.swt.events.SelectionEvent;
import org.eclipse.swt.layout.FormAttachment;
import org.eclipse.swt.layout.FormData;
import org.eclipse.swt.layout.FormLayout;
import org.eclipse.swt.widgets.Button;
import org.eclipse.swt.widgets.Control;
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Event;
import org.eclipse.swt.widgets.Label;
import org.eclipse.swt.widgets.Listener;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.swt.widgets.Text;
import org.pentaho.di.core.Const;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStepMeta;
import org.pentaho.di.trans.step.StepDialogInterface;
import org.pentaho.di.ui.core.widget.TextVar;
import org.pentaho.di.ui.trans.step.BaseStepDialog;

/**
* @author basavaraja.pa
*
*/
public class SamplePlugInDialog extends BaseStepDialog implements StepDialogInterface {
private SamplePlugInMeta samplePlugInMeta;
private TextVar wnewField;

public SamplePlugInDialog(Shell parent, Object in, TransMeta tr, String sname) {
super(parent, (BaseStepMeta) in, tr, sname);
samplePlugInMeta = (SamplePlugInMeta) in;
}

@Override
public String open() {
Shell parent = getParent();
Display display = parent.getDisplay();

shell = new Shell(parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MIN | SWT.MAX);
props.setLook(shell);
setShellImage(shell, samplePlugInMeta);

ModifyListener lsMod = new ModifyListener() {
public void modifyText(ModifyEvent e) {
samplePlugInMeta.setChanged();
}
};
changed = samplePlugInMeta.hasChanged();

FormLayout formLayout = new FormLayout();
formLayout.marginWidth = Const.FORM_MARGIN;
formLayout.marginHeight = Const.FORM_MARGIN;

shell.setLayout(formLayout);
shell.setText(“Sample Plug-In”);// Messages.getString(“KafkaTopicPartitionConsumerDialog.Shell.Title”));

int middle = props.getMiddlePct();
int margin = Const.MARGIN;

// Step name
wlStepname = new Label(shell, SWT.RIGHT);
wlStepname.setText(“Step Name”);// Messages.getString(“KafkaTopicPartitionConsumerDialog.StepName.Label”));
props.setLook(wlStepname);
fdlStepname = new FormData();
fdlStepname.left = new FormAttachment(0, 0);
fdlStepname.right = new FormAttachment(middle, -margin);
fdlStepname.top = new FormAttachment(0, margin);
wlStepname.setLayoutData(fdlStepname);
wStepname = new Text(shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
props.setLook(wStepname);
wStepname.addModifyListener(lsMod);
fdStepname = new FormData();
fdStepname.left = new FormAttachment(middle, 0);
fdStepname.top = new FormAttachment(0, margin);
fdStepname.right = new FormAttachment(100, 0);
wStepname.setLayoutData(fdStepname);
Control lastWidget = wStepname;

Label wlnewField = new Label(shell, SWT.RIGHT);
wlnewField.setText(“New Field[Type:String]”);// Messages.getString(“KafkaTopicPartitionConsumerDialog.TopicName.Label”));
props.setLook(wlnewField);
FormData fdlnewField = new FormData();
fdlnewField.top = new FormAttachment(lastWidget, margin);
fdlnewField.left = new FormAttachment(0, 0);
fdlnewField.right = new FormAttachment(middle, -margin);
wlnewField.setLayoutData(fdlnewField);
wnewField = new TextVar(transMeta, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
props.setLook(wnewField);
wnewField.addModifyListener(lsMod);
FormData fdnewField = new FormData();
fdnewField.top = new FormAttachment(lastWidget, margin);
fdnewField.left = new FormAttachment(middle, 0);
fdnewField.right = new FormAttachment(100, 0);
wnewField.setLayoutData(fdnewField);
lastWidget = wnewField;

// Buttons
wOK = new Button(shell, SWT.PUSH);
wOK.setText(BaseMessages.getString(“System.Button.OK”)); //$NON-NLS-1$
wCancel = new Button(shell, SWT.PUSH);
wCancel.setText(BaseMessages.getString(“System.Button.Cancel”)); //$NON-NLS-1$

setButtonPositions(new Button[] { wOK, wCancel }, margin, null);

lsCancel = new Listener() {
public void handleEvent(Event e) {
cancel();
}
};
lsOK = new Listener() {
public void handleEvent(Event e) {
ok();
}
};
wCancel.addListener(SWT.Selection, lsCancel);
wOK.addListener(SWT.Selection, lsOK);

lsDef = new SelectionAdapter() {
public void widgetDefaultSelected(SelectionEvent e) {
ok();
}
};

// Set the shell size, based upon previous time…
setSize(shell, 200, 150, true);
getData(samplePlugInMeta, true);
// consumerMeta.setChanged(changed);

// setTableFieldCombo();

shell.open();
while (!shell.isDisposed()) {
if (!display.readAndDispatch()) {
display.sleep();
}
}

return stepname;
}

private void ok() {
if (Const.isEmpty(wStepname.getText())) {
return;
}
setData(samplePlugInMeta);
dispose();
}

private void cancel() {
stepname = null;
samplePlugInMeta.setChanged(changed);
dispose();
}

/**
* Copy information from the meta-data input to the dialog fields.
*/
/**
* @param consumerMeta
* @param copyStepname
*/
private void getData(SamplePlugInMeta samplePlugInMeta, boolean copyStepname) {
if (copyStepname) {
wStepname.setText(stepname);
if (samplePlugInMeta.getNewField() != null)
wnewField.setText(samplePlugInMeta.getNewField());
}
}

/**
* Copy information from the dialog fields to the meta-data input
*/
private void setData(SamplePlugInMeta samplePlugInMeta) {
stepname = wStepname.getText();
samplePlugInMeta.setNewField(wnewField.getText());
samplePlugInMeta.setChanged();
}
}

 

Step 7 :

 

Create class ‘SamplePlugInStep ‘which extends BaseStep and implements StepInterface as below

/**
*
*/
package com.sigma.sample;

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowDataUtil;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMeta;
import org.pentaho.di.core.row.ValueMetaAndData;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;

/**
* @author basavaraja.pa
*
*/
public class SamplePlugInStep extends BaseStep implements StepInterface {

private SamplePlugInData data;
private SamplePlugInMeta meta;

public SamplePlugInStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}

public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
meta = (SamplePlugInMeta) smi;
data = (SamplePlugInData) sdi;

return super.init(smi, sdi);
}

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
meta = (SamplePlugInMeta) smi;
data = (SamplePlugInData) sdi;

Object[] inputRow = getRow(); // get row, blocks when needed!
if (inputRow == null) // no more input to be expected…
{
setOutputDone();
return false;
}

if (first) {
first = false;
data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone();
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
}

Object[] Object = RowDataUtil.addRowData(inputRow.clone(), getInputRowMeta().size(),
new Object[] { meta.getNewField() });
Object[] newRow = RowDataUtil.createResizedCopy(Object, data.outputRowMeta.size());

putRow(data.outputRowMeta, newRow);

return true;
}

public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
meta = (SamplePlugInMeta) smi;
data = (SamplePlugInData) sdi;

super.dispose(smi, sdi);
}

}

Here is the final eclipse project structure.

Step 8 :

  • Create a folder ‘Sample-Plug-In’ under …\data-integration\plugins\steps

‘Sample-Plug-In’ under Data Integration

  • Create a folder ‘lib’ under …\data-integration\plugins\steps\Sample-Plug-In which is used to keep referenced jars for the plug-in
  • Have a logo image file which is used in the plug-in
  • Now, Go to Eclipse and build jar file to …\data-integration\plugins\steps\Sample-Plug-In

  • Select the JAR export destination …\data-integration\plugins\steps\Sample-Plug-In and mention JAR file name as ‘sample-plug-in’

You will see jar file as below

Step 9 :

Create plugin.xml: Tell to PDI which is implementation (Meta) class, Name of the plug-in and additional jars to be loaded

<?xml version=”1.0″ encoding=”UTF-8″?>
<plugin
id=”sample-plug-in”
iconfile=”logo.png”
description=”sample-plug-in”
tooltip=” This plug-in allows adding New String Field into the stream”
category=”Input”
classname=”com.sigma.sample.SamplePlugInMeta”>

<libraries>
<library name=”sample-plug-in.jar”/>
</libraries>

<localized_category>
<category locale=”en_US”>Input</category>
</localized_category>
<localized_description>
<description locale=”en_US”>Sample Plug-In</description>
</localized_description>
<localized_tooltip>
<tooltip locale=”en_US”>This plug-in allows adding New String Field into the stream</tooltip>
</localized_tooltip>
</plugin>

id:  Should be unique id

iconfile: Image for your plug-in in your transformation
description: Description of the plug-in

tooltip:  Tooltip of the plug-in

category: Under which your plug-in appears to be

classname:  Fully qualified name of the Meta Class

libraries:  Referenced JARS. Must specify your implemented JAR and Specify referenced JARS under lib folder also if any

localized_*: Localization of Category, Description and Tooltip

Step 10 :

Restart your PDI and the new Plug-In will be available.
PDI

Step 11 :

Create a transformation having ‘Generate Rows’ and new Plug-In and Run

You will get results as shown below which will add new String Field to the Stream

Turn Your Interest into Action!

We don't just talk, we deliver. Let's discuss your business needs

     

    *I hereby agree to the terms and conditions