Empezando Apex Asíncrono por las colas

1. Introducción

En esta entrada me centraré, en cómo diseñar la ejecución de Procesos Asíncronos Apex (en adelante Apex Async), aprovechando las capacidades de la plataforma.

Pero lo haré de manera invertida a como se hace habitualmente, donde se explica como programar los mecanismos de programación. Pero en IMMO, si los desarrolladores, arquitectos y administradores, entendemos bien como gestiona Salesforce los procesos asíncronos, sus colas de Espera y Ejecución, sus límites asociados, etc., la elección del mecanismo de programación, es muy sencilla u obvia.

En el contexto asíncrono, un diseño centrado en la programación y no en las capacidades de la plataforma, puede suponer limitar el rendimiento y las prestaciones de nuestro proyecto.

Y  es que si lo hubieras sabido, te hubieras cortado el pelo antes, y  no hubieras ido con tu compañero.

  1. Introducción
  2. Back to Basics: ¿Qué és un proceso Apex asíncrono?
  3. Estados de un Job Apex asíncrono
  4. Apex Flex Queue(s) y la Apex Jobs Queue
    4.1. Apex Flex Queue
    4.1.1. Orden de inserción en la Flex Queue y reordenación de prioridades
    4.2. Test-Context Queue (la otra Flex Queue)
    4.3. Apex Jobs Queue
  5. Límites de las colas
  6. Monitorización de las colas
    6.1. Mediante Interfaz de usuario
    6.2. Mediante el objeto AsyncApexJob
    6.3. Mediante CronTrigger y CronJobDetail para los Jobs Scheduled
  7. Retos reales no cubiertos por la plataforma

2. Back to Basics: ¿Qué és un proceso Apex asíncrono?

Un Job de Apex con naturaleza asíncrona (en adelante Apex Async ó Job Async), se ejecuta cuando la plataforma tiene recursos disponibles (lo que sucede muy a menudo) sin ninguna horquilla de finalización ni ANS o ETC (Service Level Agreement – Estimated Time to Complete), y siempre sin perjudicar al ámbito síncrono, donde trabaja el usuario final, que se ejecuta con prioridad.

En contrapartida, los límites para un Job Async son límites superiores que para un proceso síncrono. Así un Job asíncrono posee:

Los Limites para Jobs Asíncronos son superiores en prestaciones clave

Recalco, no existe ETC por parte de Salesforce, y por tanto, nuestro diseño no puede suponer ninguno de los aspectos siguientes:

  1. Cuando se iniciará el proceso
  2. Cuanto durará el proceso
  3. Cuando estarán disponibles los resultados de la ejecución

Ejecución de Apex Asíncrono basado en colas

Existen diferentes tipos de Jobs Apex Async, seguro que has visto algún método con la anotación @Future, pero siendo muy útil y práctico, ese es el hermano menor de todos.

Existen 3 mecanismos adicionales, implementados mediante Interfaces (no asustarse es muy simple):

  1. @Queueable
  2. @Schedulable
  3. @Batchable

Pero como viste en la imagen anterior, se gestionan mediante colas. Por tanto, aquí empieza el Rock&Roll, veamos los estados de un proceso y como la plataforma gestiona los Jobs asíncronos.

Salesforce Queues para Apex Async

3. Estados de un Job Apex asíncrono

Los estados de un proceso (Job) Apex Async, transcurre por los siguientes estados:

Estado que reporta la plataforma Descripción y Cola en la que reside el Proceso Es un estado finalista?
Holding El proceso Batch se ha enviado correctamente, pero queda pendiente de ser ejecutado, cuando hay recursos disponibles, que permitan su ejecución. El proceso reside en la cola Flex No
Queued La ejecución es inminente. El proceso se transfiere de la cola Flex a la cola Jobs, y se mantendrá en esta cola hasta la finalización No
Preparing El método start es ejecutado, ha empezado el Rock&Roll No
Processing Se ejecuta la lógica completa del proceso No
Aborted Es un estado finalista, al que se llega si se decide abortar la ejecución de manera voluntaria Si
Completed El proceso ha finalizado, pueden existir errores o finalización totalmente correcta Si
Failed Estado que indica que algo salió mal a nivel de plataforma, y el proceso no fue finalizado correctamente  Si

Veamos como se relaciona la ejecución de un Job con las colas de la plataforma.

4. Apex Flex Queue(s) y la Apex Jobs Queue

La plataforma Salesforce, ofrece 3 colas, para la gestión de los procesos asíncronos:

  1. Apex Flex Queue (Flex por Flexible)
  2. Su homóloga para testing la Test-Context Queue
  3. Apex Jobs Queue

Nota: cuando Salesforce liberó la funcionalidad de la cola Apex Jobs en 2015, se hablaba de la Batch Queue, actualmente este término está deprecado.

4.1. Apex Flex Queue

Al crear un Job de Apex Async Batch, su ejecución no se inicia hasta confirmar que hay recursos disponibles para su ejecución. Por tanto, el proceso es:

  1. Crear el Job, serializarlo para ubicarlo en las colas de sistema
  2. Si se pudo crear el Job (límites, etc.) el sistema retorna un ID identificando el Job (este ID permite inspeccionar las características del Job)
  3. Si el Job es Batch, se coloca en la cola Flex Queue, en estado Holding – esto implica que no se está ejecutando – ninguna de sus métodos ha sido invocado excepto el constructor de la clase (esto es importante)
  4. Si el Job no es Batch, queda encolado en la cola de proceso Job Apex para su procesamiento cuando haya recursos

  5. En cuanto la plataforma tenga recursos disponibles, ejecutará el Job y lo moverá de la cola Flex a la cola Apex Jobs.

En esta cola, el proceso tiene todos los atributos (15) de ejecución: Job Type, Summitted By, Apex Class, Apex Method, etc.

Campos Descriptivos Job Flex Queue

4.1.1. Orden de inserción en la Flex Queue y reordenación de prioridades

El orden de inserción en la cola es fundamental, ya que indicará como el sistema iniciará la ejecución de los Jobs. Por tanto, el orden de inserción marcará el lanzamiento de la ejecución del Job.

Un error común, es considerar que los procesos Batch se ejecutarán uno detrás de otro, y puede no ser así, dado que la plataforma puede ejecutar hasta 5 procesos concurrentes simultáneamente. Por tanto el diseñador debe tener en consideración que:

  1. Los Jobs se iniciarán en orden de inserción en la cola Flex (salvo errores en su creación)
  2. Los Jobs pueden ejecutarse simultáneamente y se desconoce el orden de finalización, dado que son procesos asíncronos usando recursos disponibles. Suponer que la dependencia funcional, que 2 Jobs deben ser ejecutados en modo serie, será satisfecha por haber insertado uno detrás de otro, es un error de diseño.

Mientras el Job Batch está en la cola Flex, tanto el administrador como el programador tienen la posibilidad de reubicar los procesos.

El Administrador dispone de una interfaz de Monitorización de la cola:

Detalle de la Flex Queue y Opciones disponibles en la interfaz de usuario.

Por su parte, el programador tiene acceso a los 4 métodos de la clase FlexQueue:

  1. moveAfterJob(jobToMoveId, jobInQueueId)
  2. moveBeforeJob(jobToMoveId, jobInQueueId)
  3. moveJobToEnd(jobId)
  4. moveJobToFront(jobId)

4.2. Test-Context Queue (la otra Flex Queue)

La plataforma proporciona una cola adicional, para que los programadores puedan realiza Test del encolamiento en la Flex Queue. Estos métodos pertenecientes a la clase Test son:

  1. enqueueBatchJobs(numberOfJobs)que permite enviar a la cola el número indicado de jobs
  2. getFlexQueueOrder()que devuelve la lista de los identificadores de los Jobs presentes en la cola

Un ejemplo:

Ejemplo de uso de la Test Flex Queue

4.3. Apex Jobs Queue

Alberga todos los Jobs del sistema, tanto en estados intermedios como no finalistas (ver apartado anterior), y tanto Jobs internos de Salesforce como los creados por el programador. Estos son:

Jobs creados por un programador para procesamiento Apex asíncrono:

  1. Future
  2. Queueable
  3. ScheduledApex
  4. BatchApex

Jobs creados por Salesforce para la gestión interna del sistema:

  1. SharingRecalculation
  2. BatchApexWorker
  3. TestRequest
  4. TestWorker
  5. ApexToken

Aunque no documentado por Salesforce (como comenta Pedro Espada en la Success Comunity) los procesos Future se ejecutan prioritariamente, por delante de Queueable y Batchable. Quizás la razón, es porque este tipo de procesos están pensados inicialmente para fragmentos asíncronos sin una gran carga, llamadas a Web Services, etc.

5. Límites de las colas

Es imprescindible conocer los límites sobre las colas y Jobs, de los que considero más importantes son:

Límites Apex Async
  1. Pueden existir hasta 100 Jobs Batch en estado Holding. Por tanto la profundidad máxima de la cola Flex, es 100 Jobs.
  2. Solo pueden existir 5 Jobs Activos: *Queued, Preparing, Processing
Error al superar el límite de 100 Procesos
  1. Hasta 50 Jobs pueden enviarse a la cola Apex Job con la sentencia System.enqueueJob
Si tratamos de insertar +50 Jobs mediante System.Enqueue en la cola de ejecución Apex Job, obtendremos una excepción
  1. Análogamente para procesos Future también tenemos la limitación de 50 Jobs como máximo

Cualquier inserción en las colas que pretenda superar estos límites, recibe una excepción del sistema, y se rechaza.

6. Monitorización de las colas

6.1. Mediante Interfaz de usuario

La interfaz de Salesforce proporciona acceso a los procesos cronificados,  la Flex Queue y a la Apex Jobs.

Acceso a los interfaces de Monitorización de las colas

En el caso de la cola Flex además permite la reordenación de los Jobs y acceder a sus características. La reordenación se realiza indicando en qué posición se desea colocar el Job.

6.2. Mediante el objeto AsyncApexJob

Además de la interfaz, la plataforma proporciona un objeto, AsyncApexJob, donde cada registro describe un Job, con todos sus atributos. Este registro se actualiza a medida que el Job va transcurriendo por su ciclo de vida.

Este objeto es accesible vía SOQL, lo que nos permite obtener información sobre cuantos Jobs tenemos, cuantos son de cada tipo, cuantos están en un determinado estado, duración de los finalizados, ver las características de un Job concreto, etc.

Los procesos que residen en la cola Flex, también están presentes en AsyncApexJob, ya que son Jobs asíncronos como el resto, aunque en un estado determinado.

Consulta del objeto ApexJobAsync para mostrar los Jobs en Proceso y Completados

6.3. Mediante CronTrigger y CronJobDetail para los Jobs Scheduled

Para los Jobs de tipo Scheduled, la plataforma además nos proporciona los objetos CronTrigger y CronJobDetail. Existe mucha documentación en Trailhead y la API donde encontrar ejemplos de uso

  1. CronTrigger contiene varios campos como PreviousFireTime, TimesTriggered, State, NextFireTime, etc., que contienen la información del Job respecto a sus ejecuciones
  2. CronJobDetail contiene 2 miembros: JobType y Name

7. Retos no cubiertos por la plataforma

Casos de uso que requieren adaptaciones de la plataforma

Reto 1: Tengo procesos dependientes funcional que deben ejecutarse en serie

Conociendo el ciclo de vida de un Job y las colas empleadas para su gestión, podemos encontrar una limitación muy rápidamente: ¿Cómo asegurar que un Job se ejecute y complete previamente a otro, dado que existe una dependencia funcional de negocio entre ellos?

Si consideramos al Job inicial como “Job padre” y el dependiente como “Job hijo”, podemos traducir el caso de uso a la inserción del Job Hijo en la Flex Queue no debe realizarse hasta haber concluido la ejecución del proceso Padre.

Para ello tenemos varias alternativas:

  1. Si tenemos Jobs de tipo Queueable podemos utilizar Job Chaining
  1. Si tenemos Jobs de tipo Batchable, en el finish() del padre, podemos lanzar el Job hijo
  1. Sincronizamos la ejecución de los Jobs vía Platform Events: cuando el Job Padre finaliza, lanza un evento para que el Job hijo sea insertado en la Job Queue.

Los 3 esquemas anteriores, sufren de un defecto, ¿qué sucede si el hijo tiene varias dependencias, por ejemplo padre y madre, ó un padre y pero no iniciarme antes de las 22.00?

  1. Construir un planificador dinámico, que veremos en la siguiente entrada.

Reto 2: Tengo Jobs Scheduled que deben ejecutarse a una hora concreta o periódicamente de forma garantizada

Este caso es más complejo, dado que su traducción es **Cuando llegue el momento de ejecución de ese proceso, no debe haber más de 4 procesos ejecutándose, para tener las máximas garantías (no la seguridad total) de que ese proceso podrá ejecutarse.

Para ello, debemos tener un control de los procesos que están activos en la cola Apex Jobs y de los procesos en la cola Flex.

Este caso de uso, requiere nuevamente de un planificador dinámico que controle de forma adecuada ambas colas.

Enlaces interesantes

  1. Asynchronous Processing in Force com – Lectura y comprensión obligatoria
  2. Asynchronous Processing Basics
  3. Monitor Asynchronous Apex
  4. Monitoring the Apex Flex Queue
  5. Monitoring the Apex Job Queue
  6. SOAP API Developer Guide for AsyncApexJob
  7. Differences between job types for ‘AsyncApexJob’ object
  8. Trailhead – Async Apex
  9. Trailhead – Control Processes with Queueable Apex

BULK API v2: 2 veces más rápida con la mitad de código

Una de las novedades que quizás pasaron inadvertidas en la última release de Winter ’18, fue la nueva versión de la BULK API, denominada BULK API v2, disponible en v41.0.

Esta nueva versión, sigue siendo una API REST, que utiliza los verbos HTTP para crear Jobs, cerrar/abortar, eliminar y obtener información al respecto, aportando novedades para el programador y una mejora del rendimiento para el usuario final.

En esta entrada, se muestran las diferencias entre ambas versiones y se compara el rendimiento.

Comparación entre versiones

Las novedades de esta nueva versión están orientadas al uso de la API:

  1. Ya no es necesaria la gestión de Batches dentro de1 Job.
    1. Dicho así parece simple, pero aquellos que hemos programado en este esquema, veremos simplificado el código necesario en un % elevado
  2. Los límites para el uso de esta API, se simplifican:
    1. 100 millones de registros/día
    2. Contenido de mensaje que no superen los 150MB codificados en B64 al llegar a Salesforce (lo que significa a ojo de buen cubero, como máximo mensajes con payload de 100 MB en origen)
  3. La velocidad de ejecución:
    1. Como se observa en las mediciones realizadas llega a ser x2

De menor importancia, pero muy útil:

  • En la v2 se ofrece un nuevo servicio, accesible con el verbo GET, para consultar el estado de los Jobs, pudiendo solicitar todos aquellos Jobs que cumplen ciertas condiciones
  • Además la v2 soporta 6 tipos de separadores de campos distintos (backquote, caret, comma, pipe, semicolon, y tab) lo que evita tener que modificar nuestros ficheros origen (mayor flexibilidad lo que comporta menos código)

Comparación entre ambas APIs

Simplificación del proceso

En la V1 la creación de trabajos requería:

  1. Autenticación
  2. Creación del Job
  3. Gestión de los Batches
    1. Creación individual de cada Batches
    2. Empaquetar cada Batch en el límite
    3. Envío de los datos del Batch
    4. Gestión individual del estado de las cargas
    5. Confirmación/Retry en caso de incidencias
  4. Cierre del Batch (para inicio de ejecución)
  5. Pooling del estado del Job
  6. Gestión de los resultados (obteniendo la información de los registros ejecutados/correctos/error)

En la V2, este proceso se simplifica:

  1. Autenticación
  2. Creación del Job (simple o multipart)
  3. Gestión de los Batches
    1. Creación individual de cada Batche
    2. Empaquetar cada Batch en el límite
    3. Envío de los datos del Batch Job
    4. Gestión individual del estado de las cargas
    5. Confirmación/Retry en caso de incidencias
  4. Cierre del Batch Job (para iniciar ejecución)
  5. Pooling del estado del Job, mediante invocación del servicio de intención de información del Job
  6. Gestión de los resultados (obteniendo la información referente a los registros ejecutados/correctos/error)

Es decir:

  • Salesforce ahora libera al desarrollador de la segmentación y tratamiento de los Batches, y únicamente requiere la creación del Job, Upload de los datos (si no utilizamos Multipart), y chequeo de estado con obtención de resultados
  • La segmentación de los datos se realiza ahora de forma que cada segmento, contiene 10.000 registros, como podemos ver en la página estado del Job, lo que disminuye el número de Batches que gestiona Salesforce internamente.
  • Se sigue la regla del 10×10: si el proceso tardara más de 10′, Salesforce lo marcaría como a Reintentar, hasta un máximo de 10 veces (en esta circunstancia el Job se da por Failed).

Detalle de un Job con la BULK API v2

Por tanto, la parte más compleja queda eliminada del proceso, simplificándolo en gran medida.

Operaciones disponibles en v2 y como usarlas

Las operaciones y los verbos utilizados en la API, son intuitivos, pero se recomienda estar muy atento al uso de las cabeceras que se indican en la documentación oficial de Salesforce, para evitar errores inesperados.

El proyecto Java, construido para esta entrada, disponible como Repositorio público, puede ser una opción,  para observar como son los valores, tanto para las cabeceras, como los Payload y respuestas, y así evitar problemas inesperados.

Autenticación

Sin cambios, la autenticación se realiza obteniendo el token oAuth como se realiza en v1.

Creación de un Job Simple

Existen 2 posibilidades para crear un Job. La opción, que denominamos Simple, consta de varios pasos: Creación, Upload de datos,  y Cierre del Job (inicio del procesamiento en Salesforce) y opcionalmente obtener información y estado final de los registros gestionados.

Para la creación tan solo se requiere,  el envío de una petición POST al endpoint  /services/data/vXX.X/jobs/ingest/. En el cuerpo del mensaje se indica:

  • El objeto destino, por ejemplo Persona__c, Account, etc.
  • La operación bulk a realizar: insert, update, delete, etc.
  • Los parámetros adicionales que caracterizan la lectura de los datos, o la naturaleza del Job (paralelismo, concurrencia, etc.).

Es imprescindible leer la documentación (al final del artículo todos los enlaces disponibles) para entender las posibilidades y restricciones que impone la API.

Ejemplo llamada Java Creación Job para la BULK API v2

Upload de Datos en un Job Simple creado

El Upload de datos, requiere de haber realizado la llamada anterior con éxito, dado que en la respuesta, se obtiene el endpoint de datos, donde deben enviarse los datos (100 MB como máximo aproximadamente – ver la sección de límites).

Obtenido el endpoint, solo se requiere un PUT a esa URL, pero debemos ser cuidadosos con la construcción de las cabeceras, y de los encodings (siguiendo el ejemplo del código disponible en el REPO, se obtiene como hacerlo correctamente).

Ejemplo llamada Java para el envío de datos al Job creado para la BULK API v2

Cerrar el Job

Con todos los datos enviados, se requiere de una llamada con el verbo PATCH, sobre el endpoint /services/data/vXX.X/jobs/ingest/jobID, indicando a Salesforce que todos los datos han sido enviados, y debe iniciar el procesamiento (construcción de los batches internos y toma de resultados).

Ejemplo llamada Java para el cierre del Job creado para la BULK API v2

Creación de un Job Multipart

Alternativamente a los pasos anteriores, es posible usar una única invocación para la creación, upload y cierre.

Para ello se requiere la construcción de un mensaje Multipart, siguiendo el formato indicado en la documentación, con el verbo POST hacía el endpoint /services/data/vXX.X/jobs/ingest/.

Ejemplo llamada Java Creación Job Multipart para la BULK API v2

Obtener estado del Job

Los estados del Job son: Open, UploadComplete. InProgress, JobComplete, Failed, Aborted. Existen ciertas restricciones,  como por ejemplo: para eliminar un Job, no puede estar en estado inProgress, o aparece el siguiente mensaje:

[{"errorCode":"API_ERROR","message":"Error encountered when deleting the job because the job is not terminated"}]

Consultar su estado requiere una invocación con el verbo GET al endpoint /services/data/vXX.X/jobs/ingest/jobID.

Ejemplo llamada Java para la consulta del estado de un Job para la BULK API v2

Información que se obtiene de un Job

La información que se obtiene de un Job, es completa, fácil de obtener y consumir. El objeto JSON retornado es:

{
"id": "7501r0000097DEBAA2",
"operation": "insert",
"object": "Contact",
"createdById": "005w000000484LsAAI",
"createdDate": "2017-12-17T09:29:10.000+0000",
"systemModstamp": "2017-12-17T09:29:10.000+0000",
"state": "Open",
"concurrencyMode": "Parallel",
"contentType": "CSV",
"apiVersion": 41.0,
"jobType": "V2Ingest",
"contentUrl": "services/data/v41.0/jobs/ingest/7501r0000097DEBAA2/batches",
"lineEnding": "LF",
"columnDelimiter": "COMMA",
"retries": 0,
"totalProcessingTime": 0,
"apiActiveProcessingTime": 0,
"apexProcessingTime": 0
}

Rendimiento comparado entre versiones

Hasta aquí todo son bondades para los programadores que trabajen con la API, pero poca repercusión tiene para los usuarios finales que la utilizan mediante herramientas de terceros, como ETLs, Data Loader, etc.

Para comparar tiempos, se ha construido un cliente Java sencillo con Web Service Connector, (enlace al repositorio) y se ha ejecutado una batida de pruebas tomando tiempos de ejecución de ambas versiones de la API (durante fin de semana previo a Navidad, cuando las instancias supuestamente estarán con baja ocupación).

Los resultados han sido:

OPERACIÓN y volumen Tiempo MEDIO empleado por la BULK API v1 Tiempo MEDIO empleado por la BULK API v2 Diferencial porcentual
INSERT 250K 131” 41” -220%
INSERT 500K 251” 138” -82%
INSERT 1M 442” 263” -68%
SOFT DELETE 500K 160” 148” -8%
UPDATE 250K 31” 22” -41%
UPDATE 500K 142” 123” -15%
UPSERT 500K 178” 110” -62%
  • Los tiempos se expresan en segundos
  • El volumen de registros se expresa en miles (k), es decir 250k indican 250.000 registros
  • Un diferencial negativo, implica una mejora del rendimiento
  • La operación Hard Delete no está actualmente disponible en la API V2
Resumen de Resultados porcentuales obtenidos

Conclusiones

Esta nueva versión v2, mejora la anterior tanto en uso, simplificándolo, como en rendimiento, mejorándolo, lo que supone que todos los usuarios, tanto técnicos como finales, se verán beneficiados.

Más sencillo y más rápido comporta más barato

Enlaces interesantes

Big Objects: Contenedores de Datos – un quiero y quizás puedo de almacenamiento masivo en Salesforce

En la entrada anterior, describía el caso de uso de cómo acceder a una gran volumen de datos desde nuestra ORG via External Objects, y dejaba la puerta abierta a la alternativa a la funcionalidad nativa de Big Objects en Salesforce.

Esta entrada describe qué son, en qué casos de uso pueden aplicarse, cómo crearlos, insertar datos (con Data Loader que parece ser un caso poco documentado) y describir sus características y restricciones, que no son pocas.

Continue reading “Big Objects: Contenedores de Datos – un quiero y quizás puedo de almacenamiento masivo en Salesforce”

Skinny Tables para la mejora de rendimiento Salesforce

Supongamos que después de haber seguido las buenas prácticas de diseño, programación, aplicado índices donde era necesario, seguimos teniendo problemas de rendimiento de un Report/Dashboard o SOQL concreta.

Como almacena Salesforce la información internamente

Continue reading “Skinny Tables para la mejora de rendimiento Salesforce”