EDITAR 2018-01-27:
Resulta que este problema está relacionado con DirectRunner. Si ejecuta la misma canalización con DataflowRunner, debería obtener lotes que en realidad tienen hasta 1000 registros. DirectRunner siempre crea paquetes de tamaño 1 después de una operación de agrupación.
Respuesta original:
Me encontré con el mismo problema al escribir en bases de datos en la nube usando JdbcIO de Apache Beam. El problema es que si bien JdbcIO admite la escritura de hasta 1000 registros en un lote, en realidad nunca lo he visto escribir más de 1 fila a la vez (debo admitirlo:esto siempre fue usando DirectRunner en un entorno de desarrollo).
Por lo tanto, he agregado una función a JdbcIO en la que puede controlar el tamaño de los lotes usted mismo agrupando sus datos y escribiendo cada grupo como un solo lote. A continuación, se muestra un ejemplo de cómo usar esta característica basado en el ejemplo original de WordCount de Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
La diferencia con el método de escritura normal de JdbcIO es el nuevo método writeIterable()
que toma un PCollection<Iterable<RowT>>
como entrada en lugar de PCollection<RowT>
. Cada iterable se escribe como un lote en la base de datos.
La versión de JdbcIO con esta adición se puede encontrar aquí:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
El proyecto de ejemplo completo que contiene el ejemplo anterior se puede encontrar aquí:https://github.com/ olavloite/spanner-beam-example
(También hay una solicitud de extracción pendiente en Apache Beam para incluir esto en el proyecto)