欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

etl nifi ExecuteScript 一些 Groovy,Jython,Javascript(Nashorn)和JRuby 语言手法

程序员文章站 2022-07-06 16:27:02
...

目录

介绍

 几个重要的玩意

 获取文件前提条件

获取流file小李子

从回话中获取多个流文件然后弄它

小李子

用create()搞一个新的FlowFile发送到下一个处理器

小李子

从基于传入的FlowFile生成新的FlowFile

栗子

想要添加自定义属性的流文件,为流文件添加一个属性

FlowFile对象介绍

小李子

想要添加自定义属性的流文件,将多个属性添加到流文件

Map介绍

示例

从流文件中获取属性

栗子

从流文件获取所有属性 all

栗子:

将流文件转移到关系  (“成功”或“失败”)

重点

栗子

设置自己的日志和日志级别弄起来

栗子



介绍

           ExecuteScript 的 Groovy,Jython,Javascript(Nashorn)和JRuby 写法, ExecuteScript优势可言灵活的写出内容,在运行中使用脚本。

 

 几个重要的玩意

  • session(会话):这是对分配给处理器的ProcessSession的引用。会话允许您对流文件(如create()putAttribute()transfer()以及read()write()()进行操作。
  • context(上下文):这是对处理器的ProcessContext的引用。它可以用来检索处理器属性,关系,Controller服务和StateManager。
  • log:这是对处理器ComponentLog的引用。用它来记录消息给NiFi,比如log.info('Hello world!')
  • REL_SUCCESS:这是对处理器定义的“成功”关系的引用。它也可以通过引用父类(ExecuteScript)的静态成员来继承,但是一些引擎(如Lua)不允许引用静态成员,所以这是一个方便的变量。这也节省了必须使用关系的完全合格的名称。
  • REL_FAILURE:这是对处理器定义的“失败”关系的引用。和REL_SUCCESS一样,它也可以通过引用父类(ExecuteScript)的静态成员来继承,但是一些引擎(如Lua)不允许引用静态成员,所以这是一个方便的变量。这也节省了必须使用关系的完全合格的名称。
  • Dynamic Properties : 在ExecuteScript中定义的任何动态属性都将作为设置为与动态属性对应的PropertyValue对象的变量传递给脚本引擎。这允许您获取属性的String值,还可以针对NiFi表达式语言评估该属性,将该值作为适当的数据类型(例如布尔值)等进行转换。由于动态属性名称会成为脚本的变量名称,您必须知道所选脚本引擎的变量命名属性。例如,Groovy不允许在变量名称中使用句点(。),因此如果“my.property”是一个动态属性名称,则会发生错误。

 获取文件前提条件

  •  从会话中获取传入的流文件
  • 在ExecuteScript 的前一个节点要有个流文件 FlowFile  ,大部分处理器都ok
  • 用 session.get() 就可以获取


获取流file小李子

  • Groovy

flowFile = session.get()
if(!flowFile) return
  • jython
flowFile = session.get() 
if (flowFile != None):
    # All processing code starts at this indent
# implicit return at the end
  • Javascript
var flowFile = session.get();
if (flowFile != null) {
   // All processing code goes here
}
  • JRuby
flowFile = session.get()
if flowFile != nil
   # All processing code goes here
end

从回话中获取多个流文件然后弄它

  • 使用会话对象中的get(maxResults)此方法返回到来自工作队列的maxResults FlowFiles
  • 如果没有FlowFiles可用,则返回一个空列表(该方法不返回null)
  • 如果存在多个传入队列,则根据一次调用是否轮询所有队列或仅调用一个队列,行为是未指定的。

小李子

  • Groovy
flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
   flowFileList.each { flowFile -> 
       // Process each FlowFile here
   }
}
  • Jython
flowFileList = session.get(100)
if not flowFileList.isEmpty():
    for flowFile in flowFileList: 
         # Process each FlowFile here
  • Javascript
flowFileList = session.get(100)
if(!flowFileList.isEmpty()) {
  for each (var flowFile in flowFileList) { 
       // Process each FlowFile here
  }
}
  • JRuby
flowFileList = session.get(100)
if !(flowFileList.isEmpty())
   flowFileList.each { |flowFile| 
       # Process each FlowFile here
   }
end

用create()搞一个新的FlowFile发送到下一个处理器

 

  • 使用会话对象的create()方法。
  • 此方法返回一个新的FlowFile对象,
  • 在上执行进一步的处理

小李子

  • Groovy
flowFile = session.create()
// Additional processing here
  • Jython
flowFile = session.create() 
# Additional processing here
  • Javascript
var flowFile = session.create();
// Additional processing here
  • JRuby
flowFile = session.create()
# Additional processing here

 

从基于传入的FlowFile生成新的FlowFile

  • 使用会话对象的create(parentFlowFile)方法。
  • 此方法采用父级FlowFile引用
  • 并返回一个新的子FlowFile对象。
  • 新创建的FlowFile将继承除UUID之外的所有父级属性
  • 此方法将自动生成Provenance FORK事件或Provenance JOIN事件
  • 具体取决于在提交ProcessSession之前是否从同一父级生成了其他FlowFiles

 

栗子

  • Groovy
flowFile = session.get()
if(!flowFile) return
newFlowFile = session.create(flowFile)
// Additional processing here
  • Jython
flowFile = session.get() 
if (flowFile != None):
    newFlowFile = session.create(flowFile) 
    # Additional processing here
  • Javascript
var flowFile = session.get();
if (flowFile != null) {
  var newFlowFile = session.create(flowFile);
  // Additional processing here
}
  • JRuby
flowFile = session.get()
if flowFile != nil
  newFlowFile = session.create(flowFile)
  # Additional processing here
end

想要添加自定义属性的流文件,为流文件添加一个属性

  • 使用会话对象中的putAttribute(flowFile,attributeKey,attributeValue)方法
  • 此方法使用给定的键/值对更新给定的FlowFile属性
  • 注意:“uuid”属性对于FlowFile是固定的,不能修改; 如key被命名为“uuid”,它将被忽略。

FlowFile对象介绍

        这也是一个很好的提及FlowFile对象是不可变的;这意味着如果您通过API更新FlowFile的属性(或以其他方式更改),则会获得新版本的FlowFile的新参考。将FlowFiles传输到关系时,这是非常重要的。您必须保留对最新版本FlowFile的引用,并且必须传输或删除从会话中检索或创建的所有FlowFiles的最新版本,否则在执行时会出现错误。大多数情况下,用于存储FlowFile引用的变量将被从改变FlowFile的方法返回的最新版本覆盖(中间FlowFile引用将自动丢弃)。在这些示例中,您将看到添加属性时重新使用flowFile引用的这种技术。请注意,对FlowFile的当前引用被传递给putAttribute()方法。生成的FlowFile具有名为“myAttr”的属性,其值为“myValue”。另请注意,该方法需要一个字符串的值;如果你有一个对象,你将不得不将它序列化为一个字符串。最后,请注意,如果您要添加多个属性,最好创建一个Map并使用putAllAttributes()来代替 看下边栗子

小李子

  • Groovy
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
  • Jython
flowFile = session.get() 
if (flowFile != None):
    flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
# implicit return at the end
  • Javascript
var flowFile = session.get();
if (flowFile != null) {
   flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
}
  • JRuby
flowFile = session.get()
if flowFile != nil
   flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
end

想要添加自定义属性的流文件,将多个属性添加到流文件

  • 使用会话对象中的putAllAttributes(flowFile,attributeMap)方法。
  • 此方法使用给定Map中的键/值对更新给定的FlowFile属性。
  • 注意:“uuid”属性对于FlowFile是固定的,不能修改;如果**被命名为“uuid”,它将被忽略。

Map介绍

        这里的技术是创建一个你想更新的属性键/值对的Map(Jython中的字典,JRuby中的Hash),然后调用putAllAttributes()。这比为每个键/值对调用putAttribute()要高效得多,因为后一种情况会导致框架为添加的每个属性创建一个临时版本的FlowFile。这些示例显示了两个条目myAttr1和myAttr2的映射,设置为“1”,将数字2的语言特定的强制转换为字符串(以符合key和value均需要字符串值的方法签名)。请注意,session.transfer()没有在这里指定(所以下面的代码片段不工作)

示例

  • Groovy
attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
flowFile = session.get()
if(!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)
  • Jython
attrMap = {'myAttr1':'1', 'myAttr2':str(2)}
flowFile = session.get() 
if (flowFile != None):
    flowFile = session.putAllAttributes(flowFile, attrMap)
# implicit return at the end
  • Javascript
var number2 = 2;
var attrMap = {'myAttr1':'1', 'myAttr2': number2.toString()}
var flowFile = session.get() 
if (flowFile != null) {
    flowFile = session.putAllAttributes(flowFile, attrMap)
}
  • JRuby
attrMap = {'myAttr1' => '1', 'myAttr2' => 2.to_s}
flowFile = session.get() 
if flowFile != nil
    flowFile = session.putAllAttributes(flowFile, attrMap)
end

从流文件中获取属性

  • 使用FlowFile对象的getAttribute(attributeKey)方法。
  • 此方法返回给定attributeKey的String值,如果找不到attributeKey,则返回null。 这些例子显示了检索“filename”属性的值。

栗子

  • Groovy
flowFile = session.get()
if(!flowFile) return
myAttr = flowFile.getAttribute('filename')
  • Jython
flowFile = session.get() 
if (flowFile != None):
    myAttr = flowFile.getAttribute('filename')
# implicit return at the end
  • Javascript
var flowFile = session.get() 
if (flowFile != null) {
    var myAttr = flowFile.getAttribute('filename')
}
  • JRuby
flowFile = session.get() 
if flowFile != nil
    myAttr = flowFile.getAttribute('filename')
end

从流文件获取所有属性 all

  • 使用FlowFile对象的getAttributes()方法。
  • 此方法返回一个带有String键和String值的Map,表示流文件的属性的键/值对。示例显示了对FlowFile的所有属性的Map的迭代。

 

栗子:

  • Groovy
flowFile = session.get()
if(!flowFile) return
flowFile.getAttributes().each { key,value ->
  // Do something with the key/value pair
}
  • Jython
flowFile = session.get() 
if (flowFile != None):
    for key,value in flowFile.getAttributes().iteritems():
       # Do something with key and/or value
# implicit return at the end
  • Javascript
var flowFile = session.get() 
if (flowFile != null) {
    var attrs = flowFile.getAttributes();
    for each (var attrKey in attrs.keySet()) { 
       // Do something with attrKey (the key) and/or attrs[attrKey] (the value)
  }
}
  • JRuby
flowFile = session.get() 
if flowFile != nil
    flowFile.getAttributes().each { |key,value| 
       # Do something with key and/or value
   }
end

将流文件转移到关系  (“成功”或“失败”)

  • 在处理流文件(新建或传入)之后,您要将流文件转换为关系(“成功”或“失败”)。
  • 在这种简单的情况下,让我们假设有一个名为“errorOccurred”的变量,指出FlowFile应该传送到哪个关系。
  • 使用会话对象的transfer(flowFile,relationship)方法。
  • 此方法根据给定的关系将给定的FlowFile传送到适当的目标处理器工作队列。
  • 从文档:如果关系导致多于一个目的地,则FlowFile的状态被复制,使得每个目的地都接收到FlowFile的精确副本,尽管每个目的地将具有其自己的唯一标识。

重点

  • ExecuteScript将在每次执行结束时执行session.commit()以确保操作已被提交。不需要(也不应该)在脚本中执行session.commit()。

栗子

  • Groovy
flowFile = session.get()
if(!flowFile) return
// Processing occurs here
if(errorOccurred) {
  session.transfer(flowFile, REL_FAILURE)
}
else {
  session.transfer(flowFile, REL_SUCCESS)
}
  • Jython
flowFile = session.get() 
if (flowFile != None):
    # All processing code starts at this indent
    if errorOccurred:
        session.transfer(flowFile, REL_FAILURE)
    else:
        session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
  • Javascript
var flowFile = session.get();
if (flowFile != null) {
   // All processing code goes here
   if(errorOccurred) {
     session.transfer(flowFile, REL_FAILURE)
   }
   else {
     session.transfer(flowFile, REL_SUCCESS)
   }
}
  • JRuby
flowFile = session.get()
if flowFile != nil
   # All processing code goes here
   if errorOccurred
     session.transfer(flowFile, REL_FAILURE)
   else
     session.transfer(flowFile, REL_SUCCESS)
   end
end

设置自己的日志和日志级别弄起来

  •  使用带有warn(),trace(),debug(),info()或error()方法的log变量。

       这些方法可以采用单个字符串,或者一个字符串,后跟一个对象数组,或者一个字符串,后跟一个Throwable对象数组。第一个用于简单的消息。当你有一些你想记录的动态对象/值的时候使用第二种。要在消息字符串中引用这些消息,请在消息中使用“{}”。这些是按照外观顺序对Object数组进行评估的,所以如果消息的内容是“Found these things:{} {} {}”,而Object数组是[‘Hello’,1,true],那么记录的消息将会是“找到这些东西:你好1true”。这些日志记录方法的第三种形式也需要一个Throwable参数,并且在发生异常并且想要记录它时非常有用。

栗子

  • Groovy
log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])
  • Jython
from java.lang import Object
from jarray import array
objArray = ['Hello',1,True]
javaArray = array(objArray, Object)
log.info('Found these things: {} {} {}', javaArray)
  • Javascript
var ObjectArrayType = Java.type("java.lang.Object[]");
var objArray = new ObjectArrayType(3);
objArray[0] = 'Hello';
objArray[1] = 1;
objArray[2] = true;
log.info('Found these things: {} {} {}', objArray)
  • JRuby
log.info('Found these things: {} {} {}', ['Hello',1,true].to_java)

 

 

 

ok

 

 

 

持续更新