Skip to content
Open
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
35db47e
feat/116 :: Order 도메인 Aggregate, VO, 이벤트, 예외, 글로벌 예외 계층
lian2945 May 18, 2026
f79c8da
feat/116 :: Order UseCase, Port, Service, DTO 구현
lian2945 May 18, 2026
fb32475
feat/116 :: Order 어댑터 (Web/Kafka/gRPC/Persistence/Security/Scheduler)
lian2945 May 18, 2026
c43e703
feat/116 :: Order application-*.yml, Dockerfile, build.gradle 설정
lian2945 May 18, 2026
616e03c
feat/117 :: Release 도메인 Aggregate, VO, 예외, 글로벌 예외 계층
lian2945 May 18, 2026
87cf732
feat/117 :: Release UseCase, Port, Service, DTO 구현
lian2945 May 18, 2026
56edccf
feat/117 :: Release 어댑터 (Web/gRPC/Persistence/Security/Scheduler)
lian2945 May 18, 2026
0c59c17
feat/117 :: Release application-*.yml, Dockerfile, build.gradle 설정
lian2945 May 18, 2026
503d3c3
feat/116 :: SonarCloud 경고 해결 (빈 테스트 메서드 주석, Dockerfile non-root 유저)
lian2945 May 18, 2026
f334e59
feat/117 :: SonarCloud 경고 해결 (빈 테스트 메서드 주석, Dockerfile non-root 유저)
lian2945 May 18, 2026
6e3c690
feat/117 :: release application-dev.yml trusted.ips 수정
lian2945 May 19, 2026
59650b6
feat/116 :: order application-dev.yml trusted.ips 수정
lian2945 May 19, 2026
6d817f6
feat/117 :: release Dockerfile 경로 및 줄바꿈 오류 수정
lian2945 May 19, 2026
7452d15
feat/116 :: order Dockerfile 경로 및 줄바꿈 오류 수정
lian2945 May 19, 2026
831584a
feat/117 :: release application-prod.yml 추가
lian2945 May 19, 2026
e92585c
feat/116 :: order application-prod.yml 수정 (server.port/TRUSTED_IPS 제거…
lian2945 May 19, 2026
6f90fdb
feat/116 :: subscribe/creator application-dev/prod.yml server.port 제거
lian2945 May 20, 2026
23ac3b3
feat/117 :: subscribe/creator application-dev/prod.yml server.port 제거
lian2945 May 20, 2026
613a3e6
fix(grpc): GrpcAdapter withDeadlineAfter(2s) 설정 추가
lian2945 May 20, 2026
b68418b
fix(grpc): GrpcAdapter withDeadlineAfter(2s) 설정 추가
lian2945 May 20, 2026
613fefc
fix(inbox): @DltHandler 추가로 DLT 전환 시 Inbox DEAD_LETTERED 상태 추적
lian2945 May 20, 2026
45d146b
fix(inbox): @DltHandler 추가로 DLT 전환 시 Inbox DEAD_LETTERED 상태 추적
lian2945 May 20, 2026
fff6513
fix(user/kafka): 전체 리스너에 @RetryableTopic + @DltHandler 추가
lian2945 May 20, 2026
1ec5ebc
fix(user/kafka): 전체 리스너에 @RetryableTopic + @DltHandler 추가
lian2945 May 20, 2026
12d1716
refactor(inbox): InboxEvent에 occurredAt() 추가, IdempotentAspect 리플렉션 제거
lian2945 May 20, 2026
af1f284
refactor(inbox): InboxEvent에 occurredAt() 추가, IdempotentAspect 리플렉션 제거
lian2945 May 20, 2026
a35f211
refactor(inbox): InboxEvent occurredAt() 계약 추가, IdempotentAspect 리플렉션 제거
lian2945 May 20, 2026
b5ba18e
fix(inbox): InboxProperties 누락 및 inbox 설정 추가
lian2945 May 20, 2026
481ceb4
fix(inbox): InboxEntity에 occurredAt 필드 추가
lian2945 May 20, 2026
7bfbddd
fix(inbox): InboxEntity에 occurredAt 필드 추가
lian2945 May 20, 2026
947d50a
ci: trigger build
lian2945 May 20, 2026
1531aa1
ci: trigger build
lian2945 May 20, 2026
92bede8
fix(inbox): InboxProperties 누락 및 inbox 설정 추가
lian2945 May 20, 2026
08ae806
fix(inbox): SubscribeInboxStatus/Repository, GeneralGoodsInboxStatus/…
lian2945 May 20, 2026
2b276d1
fix(inbox): SubscribeInboxStatus/Repository, GeneralGoodsInboxStatus/…
lian2945 May 20, 2026
4109f1f
fix(creator): CreatorDomainEvent → CreatorOutbox 리네이밍 및 테이블명 creator_…
lian2945 May 20, 2026
320c666
fix(creator): CreatorDomainEvent → CreatorOutbox 리네이밍 및 테이블명 creator_…
lian2945 May 20, 2026
df2ab7a
fix(creator): 서비스 레이어 CreatorDomainEventRepositoryPort → CreatorOutbo…
lian2945 May 20, 2026
04a0457
fix(creator): 서비스 레이어 CreatorDomainEventRepositoryPort → CreatorOutbo…
lian2945 May 20, 2026
dada21f
fix(creator): 도메인 이벤트 occurredAt 필드 추가 및 Creator.createBuilder() 복구
lian2945 May 20, 2026
7ca20c6
fix(creator): 도메인 이벤트 occurredAt 필드 추가 및 Creator.createBuilder() 복구
lian2945 May 20, 2026
3e82824
fix(kafka): creator/subscribe retry 설정 spring.kafka.retry.topic 형식으로 수정
lian2945 May 20, 2026
9d1b838
fix(kafka): creator/subscribe retry 설정 spring.kafka.retry.topic 형식으로 수정
lian2945 May 20, 2026
dbafa83
fix(creator): refactor/115 기준으로 creator 서비스 전체 동기화
lian2945 May 20, 2026
fbbf849
fix(creator): refactor/115 기준으로 creator 서비스 전체 동기화
lian2945 May 20, 2026
be3e263
fix(creator): creator.proto에 GetCreatorIdByUserId rpc 추가
lian2945 May 20, 2026
ae99e01
fix(creator): creator.proto에 GetCreatorIdByUserId rpc 추가
lian2945 May 20, 2026
1fea8c9
fix(docker): Dockerfile appuser 보안 패턴 및 JAR_FILE 경로 통일
lian2945 May 20, 2026
19f1287
fix(docker): Dockerfile appuser 보안 패턴 및 JAR_FILE 경로 통일
lian2945 May 20, 2026
525a95a
fix(inbox): 만료 메시지 조용한 폐기 → DEAD_LETTERED 영속화로 변경
lian2945 May 20, 2026
24d2402
fix(inbox): 만료 메시지 조용한 폐기 → DEAD_LETTERED 영속화로 변경
lian2945 May 20, 2026
95d0e35
fix(inbox): 만료 메시지 조용한 폐기 → DEAD_LETTERED 영속화로 변경
lian2945 May 20, 2026
c15bac0
fix(inbox): 만료 메시지 조용한 폐기 → DEAD_LETTERED 영속화로 변경
lian2945 May 20, 2026
ef6480a
fix(inbox): 만료 메시지 조용한 폐기 → DEAD_LETTERED 영속화로 변경
lian2945 May 20, 2026
36b339a
feat(order): Resilience4j CircuitBreaker/TimeLimiter 설정 추가 (releaseSe…
lian2945 May 20, 2026
de9c5f8
feat(order): Resilience4j CircuitBreaker/TimeLimiter 설정 추가 (releaseSe…
lian2945 May 21, 2026
26bc10a
refactor(order): @Transactional 범위 내 gRPC 호출 제거
lian2945 May 21, 2026
aeef1e8
feat(release): Kafka consumer로 soldQuantity 비동기 처리 (Inbox 패턴)
lian2945 May 21, 2026
62aaa64
refactor(order): AutoConfirm 스케줄러 청크 처리 + 분산 락 + 설정 외부화
lian2945 May 21, 2026
413f8aa
refactor(order): AutoConfirm 스케줄러 청크 처리 + 분산 락 + 설정 외부화 (누락 파일 추가)
lian2945 May 21, 2026
7c6f82a
fix(kafka): IdempotentAspect catch(Throwable) 수정 및 @RetryableTopic ex…
lian2945 May 22, 2026
2bf145b
Merge branch 'feat/116' into feat/117
lian2945 May 22, 2026
5bb879c
fix(order): ConfirmOrderLineService 이벤트 중복 발행 방지 가드 추가
lian2945 May 22, 2026
362bf6e
Merge branch 'feat/116' into feat/117
lian2945 May 22, 2026
0d66382
fix(order): confirmOrderLine 도메인에서 PREPARING 상태만 허용하도록 수정
lian2945 May 22, 2026
0394205
Merge branch 'feat/116' into feat/117
lian2945 May 22, 2026
a72677b
fix(kafka): @RetryableTopic에 dltStrategy, dltTopicSuffix 명시
lian2945 May 22, 2026
172f5e2
Merge branch 'feat/116' into feat/117
lian2945 May 22, 2026
e7bf5c8
fix(kafka): IdempotentAspect catch(Exception) → catch(Throwable) 컴파일 …
lian2945 May 22, 2026
1b1d96c
Merge branch 'feat/116' into feat/117
lian2945 May 22, 2026
b0327b5
Merge remote-tracking branch 'origin/main' into feat/117
lian2945 May 22, 2026
14a67c3
fix(grpc): proto Release 메시지 누락 필드 추가 및 created_at 매핑 오류 수정
lian2945 May 22, 2026
418df4d
fix(scheduler): Redisson 분산 락 + 청크 처리로 AutoStartSale 다중 인스턴스 충돌 해소
lian2945 May 22, 2026
4b9b8fa
chore(deps): redisson-spring-boot-starter 3.45.1 → 3.50.0
lian2945 May 22, 2026
6cdb350
refactor(grpc): IncreaseSoldQuantity gRPC 엔드포인트 제거 — Kafka Inbox 단일 경…
lian2945 May 22, 2026
53f1aff
refactor(release): gRPC 호출을 트랜잭션 밖으로 분리
lian2945 May 22, 2026
9a1f7d4
fix(release): PR 리뷰 코멘트 반영 — isOnSale 도메인 위임, Validator 개선, gRPC 채널 재사용
lian2945 May 22, 2026
3bf4654
refactor(validation): ZonedDateTime 제거 — epochSecond % 600 으로 단순화
lian2945 May 22, 2026
4017fd3
fix(general-goods): CreatorGrpcAdapter — grpcChannelFactory 직접 호출 제거,…
lian2945 May 23, 2026
0722dca
fix(order): shedlock-provider-redisson → shedlock-provider-redis-spri…
lian2945 May 23, 2026
82f2719
fix(docker): COPY --chown으로 chown RUN 레이어 제거 — I/O 에러 방지
lian2945 May 23, 2026
ff85478
feat(user): GET /user/me 엔드포인트 추가
lian2945 May 26, 2026
6807066
[feat/117] :: order 서비스 Debezium CDC 역직렬화 오류 수정
lian2945 Jun 1, 2026
4ab4ca7
[feat/117] :: order outbox payload에 event_id 포함
lian2945 Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion services/auth/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FROM mcr.microsoft.com/openjdk/jdk:21-ubuntu
ARG JAR_FILE=build/libs/*.jar
RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
COPY ${JAR_FILE} app.jar
RUN chown -R appuser:appuser /app
USER appuser
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
ENTRYPOINT ["java", "-jar", "app.jar"]
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@
import kr.magicbox.auth.adapter.in.kafka.annotation.Idempotent;
import kr.magicbox.auth.adapter.in.kafka.event.UserBannedEvent;
import kr.magicbox.auth.adapter.in.kafka.event.UserWithdrawnEvent;
import kr.magicbox.auth.adapter.out.persistence.repository.AuthInboxRepository;
import kr.magicbox.auth.application.port.in.HandleUserBannedUseCase;
import kr.magicbox.auth.application.port.in.HandleUserWithdrawnUseCase;
import kr.magicbox.auth.domain.vo.UserId;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class UserEventKafkaListener {

private final HandleUserWithdrawnUseCase handleUserWithdrawnUseCase;
private final HandleUserBannedUseCase handleUserBannedUseCase;
private final AuthInboxRepository authInboxRepository;

@Idempotent
@KafkaListener(topics = "outbox.event.user-withdrawn", groupId = "auth-service")
Expand All @@ -31,4 +36,11 @@ public void handleUserBannedEvent(ConsumerRecord<String, UserBannedEvent> consum
UserBannedEvent event = consumerRecord.value();
handleUserBannedUseCase.handleUserBanned(UserId.of(event.userId()));
}

@DltHandler
public void handleDlt(ConsumerRecord<String, ?> consumerRecord) {
log.error("[Inbox] DLT 전환. topic={}, partition={}, offset={}", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
authInboxRepository.findByTopicAndPartitionAndOffset(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset())
.ifPresent(inbox -> inbox.markDeadLettered());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package kr.magicbox.auth.adapter.in.kafka.aop;

import kr.magicbox.auth.adapter.in.kafka.event.InboxEvent;
import kr.magicbox.auth.adapter.in.kafka.properties.InboxProperties;
import kr.magicbox.auth.adapter.out.persistence.entity.AuthInboxEntity;
import kr.magicbox.auth.adapter.out.persistence.entity.AuthInboxStatus;
import kr.magicbox.auth.adapter.out.persistence.repository.AuthInboxRepository;
Expand All @@ -12,6 +14,8 @@
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;

@Slf4j
Expand All @@ -22,11 +26,29 @@ public class IdempotentAspect {

private final AuthInboxRepository authInboxRepository;
private final TransactionTemplate transactionTemplate;
private final InboxProperties inboxProperties;

@Around("@annotation(kr.magicbox.auth.adapter.in.kafka.annotation.Idempotent)")
public Object around(ProceedingJoinPoint pjp) {
ConsumerRecord<String, ?> consumerRecord = extractRecord(pjp);
Long eventId = Long.parseLong(consumerRecord.key());
InboxEvent event = (InboxEvent) consumerRecord.value();
Long eventId = event.eventId();
Instant occurredAt = event.occurredAt();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Major] 만료 메시지 DEAD_LETTERED 저장 시 existsByEventId 중복 체크가 누락되어 있습니다.

if (isTooOld(occurredAt)) {
    transactionTemplate.executeWithoutResult(status ->
        authInboxRepository.save(AuthInboxEntity.builder()
                .eventId(eventId)
                ...
                .status(AuthInboxStatus.DEAD_LETTERED)
                .build())
    );
    return null;
}

동일 eventId 가 (정상 처리된 후) 재전달되어 만료 판정을 받게 되면, 위 분기는 정상 처리 분기와 달리 중복 체크 없이 곧바로 save 를 호출하므로 event_id UNIQUE 제약 위반이 발생합니다. 또한 같은 메시지가 retry로 여러 번 들어와 모두 만료 처리되면 동일하게 위반됩니다.

제안:

if (isTooOld(occurredAt)) {
    if (authInboxRepository.existsByEventId(eventId)) {
        log.info("[Inbox] 이미 기록된 만료 이벤트. eventId={}", eventId);
        return null;
    }
    // save DEAD_LETTERED ...
}

creator 측 IdempotentAspect 도 동일 구조이므로 같이 보완해 주세요.


if (isTooOld(occurredAt)) {
log.warn("[Inbox] 만료된 메시지 DEAD_LETTERED 처리. eventId={}, occurredAt={}", eventId, occurredAt);
transactionTemplate.executeWithoutResult(status ->
authInboxRepository.save(AuthInboxEntity.builder()
.eventId(eventId)
.topic(consumerRecord.topic())
.partition(consumerRecord.partition())
.offset(consumerRecord.offset())
.status(AuthInboxStatus.DEAD_LETTERED)
.occurredAt(occurredAt)
.build())
);
return null;
}

return transactionTemplate.execute(status -> {
if (authInboxRepository.existsByEventId(eventId)) {
Expand All @@ -39,6 +61,7 @@ public Object around(ProceedingJoinPoint pjp) {
.partition(consumerRecord.partition())
.offset(consumerRecord.offset())
.status(AuthInboxStatus.PENDING)
.occurredAt(occurredAt)
.build());
try {
pjp.proceed();
Expand All @@ -51,6 +74,10 @@ public Object around(ProceedingJoinPoint pjp) {
});
}

private boolean isTooOld(Instant occurredAt) {
return occurredAt.isBefore(Instant.now().minus(inboxProperties.getMaxEventAgeMinutes(), ChronoUnit.MINUTES));
}

@SuppressWarnings("unchecked")
private ConsumerRecord<String, ?> extractRecord(ProceedingJoinPoint pjp) {
return Arrays.stream(pjp.getArgs())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package kr.magicbox.auth.adapter.in.kafka.event;

import java.time.Instant;

public interface InboxEvent {
Long eventId();
Instant occurredAt();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kr.magicbox.auth.adapter.in.kafka.event;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;

Expand All @@ -9,5 +10,5 @@
public record UserBannedEvent(
@JsonProperty("event_id") Long eventId,
@JsonProperty("user_id") Long userId,
@JsonProperty("banned_at") Instant bannedAt
) implements InboxEvent {}
@JsonProperty("occurred_at") @JsonAlias("banned_at") Instant occurredAt
) implements InboxEvent {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kr.magicbox.auth.adapter.in.kafka.event;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;

Expand All @@ -9,5 +10,5 @@
public record UserWithdrawnEvent(
@JsonProperty("event_id") Long eventId,
@JsonProperty("user_id") Long userId,
@JsonProperty("withdrawn_at") Instant withdrawnAt
) implements InboxEvent {}
@JsonProperty("occurred_at") @JsonAlias("withdrawn_at") Instant occurredAt
) implements InboxEvent {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kr.magicbox.auth.adapter.in.kafka.properties;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Getter
@RequiredArgsConstructor
@ConfigurationProperties(prefix = "inbox")
public class InboxProperties {
private final long maxEventAgeMinutes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.grpc.client.GrpcChannelFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.grpc.client.GrpcChannelFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.time.Instant;

@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
Expand All @@ -28,13 +30,17 @@ public class AuthInboxEntity extends BaseEntity {
@Column(nullable = false)
private AuthInboxStatus status;

@Column(name = "occurred_at", nullable = false)
private Instant occurredAt;

@Builder
public AuthInboxEntity(Long eventId, String topic, Integer partition, Long offset, AuthInboxStatus status) {
public AuthInboxEntity(Long eventId, String topic, Integer partition, Long offset, AuthInboxStatus status, Instant occurredAt) {
this.eventId = eventId;
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.status = status;
this.occurredAt = occurredAt;
}

public void markProcessed() {
Expand Down
4 changes: 3 additions & 1 deletion services/auth/src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,6 @@ security:
logging:
level:
org.springframework.web: INFO
org.springframework.data.redis: INFO
org.springframework.data.redis: INFO
inbox:
max-event-age-minutes: 5
5 changes: 4 additions & 1 deletion services/auth/src/main/resources/application-prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,16 @@ code:
ttl-seconds: ${OAUTH2_CODE_TTL_SECONDS}

frontend:
uri: ${FRONTEND_URI}
uri: ${FRONTEND_URL}

security:
trusted:
ips:
- ${TRUSTED_IP_GATEWAY}

inbox:
max-event-age-minutes: 5

logging:
level:
org.springframework.web: WARN
Expand Down
5 changes: 4 additions & 1 deletion services/creator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FROM mcr.microsoft.com/openjdk/jdk:21-ubuntu
ARG JAR_FILE=build/libs/*.jar
RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
COPY ${JAR_FILE} app.jar
RUN chown -R appuser:appuser /app
USER appuser
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
ENTRYPOINT ["java", "-jar", "app.jar"]
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package kr.magicbox.creator.adapter.in.grpc;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import kr.magicbox.creator.application.dto.query.IsCreatorOwnedByUserQuery;
import kr.magicbox.creator.application.port.in.GetCreatorIdByUserIdUseCase;
import kr.magicbox.creator.application.port.in.IsCreatorOwnedByUserUseCase;
import kr.magicbox.creator.domain.exception.CreatorNotFoundException;
import kr.magicbox.creator.domain.vo.CreatorId;
import kr.magicbox.creator.domain.vo.UserId;
import kr.magicbox.creator.grpc.creator.CreatorServiceGrpc;
import kr.magicbox.creator.grpc.creator.GetCreatorIdByUserIdRequest;
import kr.magicbox.creator.grpc.creator.GetCreatorIdByUserIdResponse;
import kr.magicbox.creator.grpc.creator.IsCreatorOwnedByUserRequest;
import kr.magicbox.creator.grpc.creator.IsCreatorOwnedByUserResponse;
import lombok.RequiredArgsConstructor;
Expand All @@ -15,6 +20,7 @@
@RequiredArgsConstructor
public class CreatorGrpcService extends CreatorServiceGrpc.CreatorServiceImplBase {
private final IsCreatorOwnedByUserUseCase isCreatorOwnedByUserUseCase;
private final GetCreatorIdByUserIdUseCase getCreatorIdByUserIdUseCase;

@Override
public void isCreatorOwnedByUser(IsCreatorOwnedByUserRequest request,
Expand All @@ -31,4 +37,22 @@ public void isCreatorOwnedByUser(IsCreatorOwnedByUserRequest request,
.build());
responseObserver.onCompleted();
}

@Override
public void getCreatorIdByUserId(GetCreatorIdByUserIdRequest request,
StreamObserver<GetCreatorIdByUserIdResponse> responseObserver) {
try {
CreatorId creatorId = getCreatorIdByUserIdUseCase.getCreatorIdByUserId(
UserId.of(request.getUserId())
);
responseObserver.onNext(GetCreatorIdByUserIdResponse.newBuilder()
.setCreatorId(creatorId.value())
.build());
responseObserver.onCompleted();
} catch (CreatorNotFoundException e) {
responseObserver.onError(Status.NOT_FOUND
.withDescription("Creator not found for userId: " + request.getUserId())
.asRuntimeException());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,44 +1,22 @@
package kr.magicbox.creator.adapter.in.kafka;

import kr.magicbox.creator.adapter.in.kafka.properties.KafkaRetryProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import kr.magicbox.creator.adapter.in.kafka.properties.InboxProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Slf4j
@EnableKafkaRetryTopic
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(InboxProperties.class)
public class KafkaConfiguration {
private final KafkaRetryProperties kafkaRetryProperties;

@Bean
public CommonErrorHandler errorHandler(KafkaTemplate<?, ?> kafkaTemplate) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(ConsumerRecord<?, ?> failedRecord, Exception ex) -> {
String topic = failedRecord.topic() + "-dlt";
log.error("[DLT] 메시지 처리 실패, DLT 전송합니다. topic={}, offset={}, exception={}", failedRecord.topic(), failedRecord.offset(), ex.getMessage());
return new TopicPartition(topic, failedRecord.partition());
});
return new DefaultErrorHandler(recoverer, new FixedBackOff(kafkaRetryProperties.getIntervalMs(), kafkaRetryProperties.getMaxAttempts()));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory,
CommonErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(1);
scheduler.setThreadNamePrefix("kafka-retry-");
return scheduler;
}
}
Loading
Loading