1、 设计需求:
现项目基于Kettle采集各个节点前置机数据到数据中心并且表结构是一致的。每台前置机上安装部署kettle运行JOB,定时采集数据到数据中心。开发的脚本除了前置数据库连接不一致外,其他的都是一样的。若是为每个节点开发JOB和其关联的Trans,开发和维护都有很大的工作量。所以设计一个下载JOB和其关联的脚本组件实现,下载采集的JOB,并能够实现脚本之间关联。
2、 实现过程:
在Kettle Spoon 设计器中已经实现了该功能。如下图:

Export all Linked resources to XML该菜单会将在spoon中当前选择的Tab的脚本,以zip压缩文件的形式导出到指定的文件夹里。
所以参考Export all Linked resources to XML菜单实现过程,来设计自己的脚本下载。
(1)首先在kettle源码中查找该功能的实现类和方法。
在org.pentaho.di.ui.spoon.Spoon.java类中找到exportAllXMLFile()方法。该方法实现了这个菜单功能。
-
-
-
- public void exportAllXMLFile() {
-
- <span style="color:#ff6666;">ResourceExportInterface</span> resourceExportInterface = getActiveTransformation();
- if (resourceExportInterface==null) resourceExportInterface=getActiveJob();
- if (resourceExportInterface==null) return;
-
-
-
-
-
- try {
- String zipFilename = null;
- while (Const.isEmpty(zipFilename)) {
- FileDialog dialog = new FileDialog(shell, SWT.SAVE);
- dialog.setText(Messages.getString("Spoon.ExportResourceSelectZipFile"));
- dialog.setFilterExtensions(new String[] {"*.zip;*.ZIP", "*"});
- dialog.setFilterNames(new String[] { Messages.getString("System.FileType.ZIPFiles"), Messages.getString("System.FileType.AllFiles"), });
- setFilterPath(dialog);
- if (dialog.open()!=null)
- {
- lastDirOpened = dialog.getFilterPath();
- zipFilename = dialog.getFilterPath()+Const.FILE_SEPARATOR+dialog.getFileName();
- FileObject zipFileObject = KettleVFS.getFileObject(zipFilename);
- if (zipFileObject.exists()) {
- MessageBox box = new MessageBox(shell, SWT.YES | SWT.NO | SWT.CANCEL);
- box.setMessage(Messages.getString("Spoon.ExportResourceZipFileExists.Message", zipFilename));
- box.setText(Messages.getString("Spoon.ExportResourceZipFileExists.Title"));
- int answer = box.open();
- if (answer==SWT.CANCEL) return;
- if (answer==SWT.NO) zipFilename = null;
- }
- } else {
- return;
- }
- }
-
-
-
- <span style="color:#000000;">TopLevelResource</span> topLevelResource = <span style="color:#ff0000;">ResourceUtil</span>.serializeResourceExportInterface(zipFilename, resourceExportInterface, (VariableSpace)resourceExportInterface, rep);
- String message = ResourceUtil.getExplanation(zipFilename, topLevelResource.getResourceName(), resourceExportInterface);
-
- EnterTextDialog enterTextDialog = new EnterTextDialog(shell, "Resource serialized", "This resource was serialized succesfully!", message);
- enterTextDialog.setReadOnly();
- enterTextDialog.open();
- } catch(Exception e) {
- new ErrorDialog(shell, "Error", "Error exporting current file", e);
- }
- }
通过这个方法,可以发现其调用了ResourceExportInterface、ResourceUtil这两个重要的类。
ResourceExportInterface——这是一个资源导出的接口,JobMeta、TransMeta都实现了该接口,所以在查看JobMeta、TransMeta实现的的方法。
ResourceUtil——将脚本导出到zip压缩文件的实现类,其实现方法如下:
-
-
-
-
-
-
-
-
-
-
-
-
- public static final TopLevelResource serializeResourceExportInterface(String zipFilename, ResourceExportInterface resourceExportInterface, VariableSpace space, Repository repository, String injectXML, String injectFilename) throws KettleException {
- ZipOutputStream out = null;
-
- try {
- Map<String, ResourceDefinition> definitions = new HashMap<String, ResourceDefinition>();
-
-
-
- if (injectXML!=null) {
- ResourceDefinition resourceDefinition = new ResourceDefinition(injectFilename, injectXML);
- definitions.put(injectFilename, resourceDefinition);
- }
-
- ResourceNamingInterface namingInterface = new SequenceResourceNaming();
-
- String topLevelResource = resourceExportInterface.exportResources(space, definitions, namingInterface, repository);
-
- if (topLevelResource!=null && !definitions.isEmpty()) {
-
-
- FileObject fileObject = KettleVFS.getFileObject(zipFilename);
-
-
-
- out = new ZipOutputStream(KettleVFS.getOutputStream(fileObject, false));
- out.setEncoding("utf-8");
- for(String filename : definitions.keySet()) {
-
- ResourceDefinition resourceDefinition = definitions.get(filename);
-
- ZipEntry zipEntry = new ZipEntry(resourceDefinition.getFilename());
- String comment = Messages.getString("ResourceUtil.SerializeResourceExportInterface.ZipEntryComment.OriginatingFile", filename, Const.NVL(resourceDefinition.getOrigin(), "-"));
- zipEntry.setComment(comment);
- out.putNextEntry(zipEntry);
- out.write(resourceDefinition.getContent().getBytes("utf-8"));
- out.closeEntry();
- }
- String zipURL = fileObject.getName().toString();
- return new TopLevelResource(topLevelResource, zipURL, "zip:"+zipURL+"!"+topLevelResource);
- } else {
- throw new KettleException(Messages.getString("ResourceUtil.Exception.NoResourcesFoundToExport"));
- }
- }
- catch(Exception e) {
- throw new KettleException(Messages.getString("ResourceUtil.Exception.ErrorSerializingExportInterface",resourceExportInterface.toString()), e);
- }
- finally {
- if (out!=null) {
- try {
- out.close();
- } catch (IOException e) {
- throw new KettleException(Messages.getString("ResourceUtil.Exception.ErrorClosingZipStream", zipFilename));
- }
- }
- }
- }
(2)通过读取源码之后,发现了其实现的过程,可以参考源码来实现自己的组件。Job plugin开发不写了。这里只是简单将方法贴出来:
- public Result execute(Result previousResult, int nr, Repository rep, Job parentJob)
- {
- Result result = previousResult;
- result.setNrErrors(1);
- result.setResult( false );
- String realrepName=environmentSubstitute(repositoryname);
- String realusername=environmentSubstitute(username);
- String realpassword=environmentSubstitute(password);
- String realJobRepositoryPath=environmentSubstitute(directoryPath);
- if(realJobRepositoryPath == null) return result;
- String realJobDir = realJobRepositoryPath.substring(0, realJobRepositoryPath.lastIndexOf("/"));
- String realJobName =realJobRepositoryPath.substring(realJobRepositoryPath.lastIndexOf("/")+1,realJobRepositoryPath.length());
- String realoutfilename=environmentSubstitute(targetDirectoryPath);
- String realfindDatabaseName =environmentSubstitute(findDatabaseName);
- String realoutconnectname=environmentSubstitute(connectname);
- String realoutdbtype=environmentSubstitute(dbtype);
- String realoutdbaccess=environmentSubstitute(dbaccess);
- String realoutdbHost=environmentSubstitute(dbHost);
- String realoutdbname=environmentSubstitute(dbname);
- String realoutdbport=environmentSubstitute(dbport);
- String realoutdbuser=environmentSubstitute(dbuser);
- String realoutdbpassword=environmentSubstitute(dbpassword);
- try
- {
-
- connectRep(log,realrepName, realJobDir, realusername, realpassword);
-
- DatabaseMeta clientDb = new DatabaseMeta(realoutconnectname,realoutdbtype,realoutdbaccess,realoutdbHost,realoutdbname,realoutdbport,realoutdbuser,realoutdbpassword);
-
- exportAllXMLFile(realJobName,realoutfilename,realfindDatabaseName,clientDb);
-
- result.setResult(true);
- }catch(Exception e)
- {
- log.logError(toString(), Messages.getString("JobExportRepository.UnExpectedError",e.toString()));
- log.logError(toString(), "Stack trace: "+Const.CR+Const.getStackTracker(e));
- }finally
- {
- if(this.rep!=null)
- {
- this.rep.disconnect();
- this.rep=null;
- }
- if(this.repinfo!=null) this.repinfo=null;
- if(this.userinfo!=null) this.userinfo=null;
- if(this.repsinfo!=null)
- {
- this.repsinfo.clear();
- this.repsinfo=null;
- }
- }
-
- return result;
- }
- public void exportAllXMLFile(String realJobName,String DirectoryPath,String findDatabaseName,DatabaseMeta databaseMeta) throws Exception {
- ResourceExportInterface resourceExportInterface = null;
- try {
- resourceExportInterface = new JobMeta(log,rep,realJobName,dir);
- } catch (Exception e) {
- log.logError(toString(), Messages.getString("JobExportRepository.UnExpectedError", e.toString()));
- log.logError(toString(), "Stack trace: " + Const.CR+ Const.getStackTracker(e));
- }
- if (resourceExportInterface == null)
- return;
- if(!Const.isEmpty(DirectoryPath)) {
- ResourceUtil.serializeResourceExportInterface(DirectoryPath,findDatabaseName,databaseMeta,resourceExportInterface,(VariableSpace) resourceExportInterface, rep);
- }
-
- }
3、 效果预览:

4、插件说明
(1)选择指定Job,填写前置机数据库信息,运行Job会,会从资源库中下载指定的job和其关联的脚本。如job 中transformation步骤中的trans,并自动更改transformation步骤中选定的tran脚本文件为下载目录中ktr文件。
(2)下载过的脚本不要在连接资源库的情况下保存,否则会把资源库中的脚本覆盖掉。在进行下载脚本的时候,就会报错。这是因为转换文件名不再为空,kettle优先使用这个设置,在进行下载时找不到ktr文件而报错。

5、插件下载
http://download.csdn.net/detail/jdk2006/4514371
|