org.apache.flink.api.common.typeinfo.Types类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(10.9k)|赞(0)|评价(0)|浏览(272)

本文整理了Java中org.apache.flink.api.common.typeinfo.Types类的一些代码示例,展示了Types类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Types类的具体详情如下:
包路径:org.apache.flink.api.common.typeinfo.Types
类名称:Types

Types介绍

[英]This class gives access to the type information of the most common types for which Flink has built-in serializers and comparators.

In many cases, Flink tries to analyze generic signatures of functions to determine return types automatically. This class is intended for cases where type information has to be supplied manually or cases where automatic type inference results in an inefficient type.

Please note that the Scala API and Table API have dedicated Types classes. (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types)

A more convenient alternative might be a TypeHint.
[中]这个类允许访问Flink内置序列化器和比较器的最常见类型的类型信息。
在许多情况下,Flink会尝试分析函数的泛型签名,以自动确定返回类型。此类适用于必须手动提供类型信息的情况,或自动类型推断导致类型效率低下的情况。
请注意,Scala API和Table API有专用的类型类。(参见org.apache.flink.api.scala.Typesorg.apache.flink.table.api.Types
一个更方便的选择可能是TypeHint。

代码示例

代码示例来源:origin: apache/flink

@Override
public TypeInformation<Tuple> getProducedType() {
  return Types.TUPLE(Types.LONG, Types.STRING);
}

代码示例来源:origin: apache/flink

names[i] = field.name();
  return Types.ROW_NAMED(names, types);
case ENUM:
  return Types.STRING;
case ARRAY:
  return Types.OBJECT_ARRAY(convertToTypeInfo(schema.getElementType()));
case MAP:
  return Types.MAP(Types.STRING, convertToTypeInfo(schema.getValueType()));
case UNION:
  final Schema actualSchema;
  } else {
    return Types.GENERIC(Object.class);
  return Types.PRIMITIVE_ARRAY(Types.BYTE);
case STRING:
    return Types.BIG_DEC;
  return Types.PRIMITIVE_ARRAY(Types.BYTE);
case INT:

代码示例来源:origin: apache/flink

private static TypeInformation<Row> convertObject(String location, JsonNode node, JsonNode root) {
  // validate properties
  if (!node.has(PROPERTIES)) {
    return Types.ROW();
  }
  if (!node.isObject()) {
    throw new IllegalArgumentException(
      "Invalid '" + PROPERTIES + "' property for object type in node: " + location);
  }
  final JsonNode props = node.get(PROPERTIES);
  final String[] names = new String[props.size()];
  final TypeInformation<?>[] types = new TypeInformation[props.size()];
  final Iterator<Map.Entry<String, JsonNode>> fieldIter = props.fields();
  int i = 0;
  while (fieldIter.hasNext()) {
    final Map.Entry<String, JsonNode> subNode = fieldIter.next();
    // set field name
    names[i] = subNode.getKey();
    // set type
    types[i] = convertType(location + '/' + subNode.getKey(), subNode.getValue(), root);
    i++;
  }
  // validate that object does not contain additional properties
  if (node.has(ADDITIONAL_PROPERTIES) && node.get(ADDITIONAL_PROPERTIES).isBoolean() &&
      node.get(ADDITIONAL_PROPERTIES).asBoolean()) {
    throw new IllegalArgumentException(
      "An object must not allow additional properties in node: " + location);
  }
  return Types.ROW_NAMED(names, types);
}

代码示例来源:origin: apache/flink

private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root) {
  // validate items
  if (!node.has(ITEMS)) {
    throw new IllegalArgumentException(
      "Arrays must specify an '" + ITEMS + "' property in node: " + location);
  }
  final JsonNode items = node.get(ITEMS);
  // list (translated to object array)
  if (items.isObject()) {
    final TypeInformation<?> elementType = convertType(
      location + '/' + ITEMS,
      items,
      root);
    // result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
    return Types.OBJECT_ARRAY(elementType);
  }
  // tuple (translated to row)
  else if (items.isArray()) {
    final TypeInformation<?>[] types = convertTypes(location + '/' + ITEMS, items, root);
    // validate that array does not contain additional items
    if (node.has(ADDITIONAL_ITEMS) && node.get(ADDITIONAL_ITEMS).isBoolean() &&
        node.get(ADDITIONAL_ITEMS).asBoolean()) {
      throw new IllegalArgumentException(
        "An array tuple must not allow additional items in node: " + location);
    }
    return Types.ROW(types);
  }
  throw new IllegalArgumentException(
    "Invalid type for '" + ITEMS + "' property in node: " + location);
}

代码示例来源:origin: apache/flink

testReadAndWrite(
  "ROW<f0 DECIMAL, f1 TINYINT>",
  Types.ROW(Types.BIG_DEC, Types.BYTE));
  Types.ROW_NAMED(
    new String[]{"hello", "world"},
    Types.BIG_DEC, Types.BYTE));
  Types.MAP(Types.STRING, Types.ROW(Types.BIG_DEC, Types.BYTE)));
  new MultisetTypeInfo<>(Types.ROW(Types.BIG_DEC, Types.BYTE)));
  Types.PRIMITIVE_ARRAY(Types.BYTE));
  Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(TestPojo.class)));
  Types.ROW_NAMED(
    new String[] {"he         \nllo", "world"},
    Types.BIG_DEC, Types.BYTE),
  Types.ROW_NAMED(
    new String[] {"he`llo", "world"},
    Types.BIG_DEC, Types.BYTE),
  Types.ROW_NAMED(
    new String[] {"he         \nllo", "world"},
    Types.BIG_DEC, Types.BYTE),
  Types.ROW_NAMED(

代码示例来源:origin: apache/bahir-flink

@Test
public void testTypeInfoParser() {
  TypeInformation<Tuple3<String, Long, Object>> type1 =
      Types.TUPLE(Types.STRING, Types.LONG, Types.GENERIC(Object.class));
  Assert.assertNotNull(type1);
  TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 =
      Types.TUPLE(Types.STRING, Types.LONG, Types.GENERIC(Object.class), Types.GENERIC(InnerPojo.class));
  Assert.assertNotNull(type2);
}

代码示例来源:origin: apache/flink

/**
 * Converts a table schema into a (nested) type information describing a {@link Row}.
 */
public TypeInformation<Row> toRowType() {
  return Types.ROW_NAMED(fieldNames, fieldTypes);
}

代码示例来源:origin: apache/flink

private static TypeInformation<?> convertStringEncoding(String location, JsonNode node) {
  if (!node.isTextual()) {
    throw new IllegalArgumentException("Invalid '" + CONTENT_ENCODING + "' property in node: " + location);
  }
  // "If the instance value is a string, this property defines that the string SHOULD
  // be interpreted as binary data and decoded using the encoding named by this property."
  switch (node.asText()) {
    case CONTENT_ENCODING_BASE64:
      return Types.PRIMITIVE_ARRAY(Types.BYTE);
    default:
      // we fail hard here:
      // this gives us the chance to support more encodings in the future without problems
      // of backwards compatibility
      throw new IllegalArgumentException("Invalid encoding '" + node.asText() + "' in node: " + location);
  }
}

代码示例来源:origin: apache/flink

private TypeInformation<?> convertObjectArray() {
  nextToken(TokenType.BEGIN);
  nextToken(TokenType.LITERAL);
  final TypeInformation<?> elementTypeInfo = convertType();
  nextToken(TokenType.END);
  return Types.OBJECT_ARRAY(elementTypeInfo);
}

代码示例来源:origin: apache/flink

@Override
public TypeInformation<Row> getReturnType() {
  return Types.ROW(Types.INT, Types.LONG, Types.STRING);
}

代码示例来源:origin: apache/flink

private TypeInformation<?> convertAny() {
  nextToken(TokenType.BEGIN);
  // check if ANY(class) or ANY(class, serialized)
  if (isNextToken(2, TokenType.SEPARATOR)) {
    // any type information
    nextToken(TokenType.LITERAL);
    final String className = token().literal;
    nextToken(TokenType.SEPARATOR);
    nextToken(TokenType.LITERAL);
    final String serialized = token().literal;
    nextToken(TokenType.END);
    final Class<?> clazz = EncodingUtils.loadClass(className);
    final TypeInformation<?> typeInfo = EncodingUtils.decodeStringToObject(serialized, TypeInformation.class);
    if (!clazz.equals(typeInfo.getTypeClass())) {
      throw new ValidationException("Class '" + clazz + "' does no correspond to serialized data.");
    }
    return typeInfo;
  } else {
    // generic type information
    nextToken(TokenType.LITERAL);
    final String className = token().literal;
    nextToken(TokenType.END);
    final Class<?> clazz = EncodingUtils.loadClass(className);
    return Types.GENERIC(clazz);
  }
}

代码示例来源:origin: apache/flink

private TypeInformation<?> convertMap() {
  nextToken(TokenType.BEGIN);
  nextToken(TokenType.LITERAL);
  final TypeInformation<?> keyTypeInfo = convertType();
  nextToken(TokenType.SEPARATOR);
  nextToken(TokenType.LITERAL);
  final TypeInformation<?> valueTypeInfo = convertType();
  nextToken(TokenType.END);
  return Types.MAP(keyTypeInfo, valueTypeInfo);
}

代码示例来源:origin: apache/flink

private TypeInformation<?> convertPojo() {
  nextToken(TokenType.BEGIN);
  nextToken(TokenType.LITERAL);
  final String className = token().literal;
  nextToken(TokenType.END);
  final Class<?> clazz = EncodingUtils.loadClass(className);
  return Types.POJO(clazz);
}

代码示例来源:origin: apache/bahir-flink

@Test
public void testStreamTupleSerializerWithTuple() {
  TypeInformation<Tuple4> typeInfo = Types.GENERIC(Tuple4.class);
  StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price");
  assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
  TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = Types.TUPLE(Types.STRING, schema.getTypeInfo());
  assertEquals("Java Tuple2<String, GenericType<" + Tuple4.class.getName() + ">>", tuple2TypeInformation.toString());
}

代码示例来源:origin: com.alibaba.blink/flink-json

private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root) {
  // validate items
  if (!node.has(ITEMS)) {
    throw new IllegalArgumentException(
      "Arrays must specify an '" + ITEMS + "' property in node: " + location);
  }
  final JsonNode items = node.get(ITEMS);
  // list (translated to object array)
  if (items.isObject()) {
    final TypeInformation<?> elementType = convertType(
      location + '/' + ITEMS,
      items,
      root);
    // result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
    return Types.OBJECT_ARRAY(elementType);
  }
  // tuple (translated to row)
  else if (items.isArray()) {
    final TypeInformation<?>[] types = convertTypes(location + '/' + ITEMS, items, root);
    // validate that array does not contain additional items
    if (node.has(ADDITIONAL_ITEMS) && node.get(ADDITIONAL_ITEMS).isBoolean() &&
        node.get(ADDITIONAL_ITEMS).asBoolean()) {
      throw new IllegalArgumentException(
        "An array tuple must not allow additional items in node: " + location);
    }
    return Types.ROW(types);
  }
  throw new IllegalArgumentException(
    "Invalid type for '" + ITEMS + "' property in node: " + location);
}

代码示例来源:origin: apache/flink

@Override
public TypeInformation<Row> getOutputType() {
  return Types.ROW_NAMED(fieldNames, fieldTypes);
}

代码示例来源:origin: apache/flink

private TypeInformation<?> convertPrimitiveArray() {
  nextToken(TokenType.BEGIN);
  nextToken(TokenType.LITERAL);
  final TypeInformation<?> elementTypeInfo = convertType();
  nextToken(TokenType.END);
  return Types.PRIMITIVE_ARRAY(elementTypeInfo);
}

代码示例来源:origin: com.alibaba.blink/flink-table-common

private TypeInformation<?> convertObjectArray() {
  nextToken(TokenType.BEGIN);
  nextToken(TokenType.LITERAL);
  final TypeInformation<?> elementTypeInfo = convertType();
  nextToken(TokenType.END);
  return Types.OBJECT_ARRAY(elementTypeInfo);
}

代码示例来源:origin: apache/flink

@Override
public TypeInformation<Row> getProducedType() {
  return Types.ROW(Types.INT, Types.LONG, Types.STRING);
}

代码示例来源:origin: apache/flink

@Test
public void testAvroStringAccess() throws Exception {
  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
  Table t = tEnv.fromDataSet(testData(env));
  Table result = t.select("name");
  List<Utf8> results = tEnv.toDataSet(result, Types.GENERIC(Utf8.class)).collect();
  String expected = "Charlie\n" +
      "Terminator\n" +
      "Whatever";
  TestBaseUtils.compareResultAsText(results, expected);
}

相关文章